diff --git a/DESIGN.md b/DESIGN.md
index 20e1bf6..64e572d 100644
--- a/DESIGN.md
+++ b/DESIGN.md
@@ -8,7 +8,9 @@
- cron 表达式:标准 5 字段 cron 表达式调度
- 一次性任务:执行后自动移除
- 任务暂停/恢复:通过 Redis 标记控制任务启停
-- **注解驱动**: 使用 `#[Crontab]` 注解声明任务,支持自动扫描发现
+- **注解驱动**: 使用 `#[Crontab]` 注解声明任务,启动时自动扫描发现
+- **事件驱动**: 通过 kiri-core PSR-14 事件系统分发任务生命周期事件
+- **幽灵任务清理**: 启动时自动清理 Redis 中已从配置移除的任务
## 二、核心组件
@@ -18,10 +20,16 @@
├─────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────────┐ │
-│ │CrontabCommand │───▶│ CrontabScheduler │ │
-│ │ (控制台命令) │ │ (调度引擎) │ │
+│ │CrontabCommand │───▶│ CrontabProcess │ │
+│ │ (Symfony命令) │ │ (继承AbstractProcess) │
│ └──────────────┘ └────────┬─────────┘ │
│ │ │
+│ ▼ │
+│ ┌──────────────────┐ │
+│ │ CrontabScheduler │ │
+│ │ (继承Component) │ │
+│ └────────┬─────────┘ │
+│ │ │
│ ┌─────────▼─────────┐ │
│ │ TaskRegistry │ │
│ │ (任务注册中心) │ │
@@ -41,6 +49,11 @@
│ │ └──────────────┘ │ │
│ └────────────────────────────────────────────────────┘ │
│ │
+│ ┌────────────────────────────────────────────────────┐ │
+│ │ kiri-core 事件系统 (PSR-14) │ │
+│ │ OnTaskBeforeExecute → OnTaskExecuted → OnTaskFailed│ │
+│ └────────────────────────────────────────────────────┘ │
+│ │
└─────────────────────────────────────────────────────────┘
```
@@ -65,7 +78,7 @@ Type: Hash
Fields:
class - 任务处理类完整路径 (如 App\Task\CleanLogTask)
name - 任务显示名称
- expression - 调度表达式 (every:60 | cron:*\/5 * * * * | daily:03:00 | at:1234567890)
+ expression - 调度表达式 (every:60 | cron:*/5 * * * * | daily:03:00 | at:1234567890)
next_run - 下次执行时间戳 (秒)
last_run - 上次执行时间戳 (秒)
status - 状态: active / paused / disabled
@@ -78,7 +91,7 @@ Fields:
```
Key: crontab:lock:task:{taskKey}
Type: String
-Value: Swoole Worker ID + Timestamp
+Value: Timestamp
TTL: 任务超时时间 (默认 300s)
```
@@ -90,7 +103,7 @@ TTL: 任务超时时间 (默认 300s)
Key: crontab:lock:master
Type: String
Value: Worker PID
-TTL: 60s (定期续期)
+TTL: 60s (并发执行期间由独立协程定期续期)
```
防止多实例同时调度,用 SET NX EX + Lua 脚本实现。
@@ -105,6 +118,17 @@ Member: 当前正在执行的任务 key 列表
用于监控哪些任务正在运行中。
+### 3.6 任务移除标记
+
+```
+Key: crontab:removal:{taskKey}
+Type: String
+Value: "1"
+TTL: 60s
+```
+
+当任务内部调用 cancelCurrentTask() 时设置此标记,执行完成后检查并清理。
+
## 四、调度表达式
| 格式 | 示例 | 含义 |
@@ -115,7 +139,7 @@ Member: 当前正在执行的任务 key 列表
| `every:{时}h` | `every:1h` | 每 1 小时执行 |
| `daily:{HH:MM}` | `daily:03:00` | 每天凌晨 3 点 |
| `hourly:{MM}` | `hourly:30` | 每小时第 30 分 |
-| `cron:{表达式}` | `cron:*\/5 * * * *` | 标准 5 字段 cron |
+| `cron:{表达式}` | `cron:*/5 * * * *` | 标准 5 字段 cron |
| `at:{时间戳}` | `at:1719590400` | 指定时间戳一次性 |
## 五、调度流程
@@ -129,6 +153,7 @@ Member: 当前正在执行的任务 key 列表
┌──────────────────────────────────────┐
│ 1. 扫描 TaskRegistry 注册的所有任务 │
│ 将未注册到 Redis 的任务写入 Hash │
+│ 清理 Redis 中已移除的幽灵任务 │
│ 将任务加入 ZSET 调度队列 │
└────────────────┬─────────────────────┘
│
@@ -151,18 +176,22 @@ Member: 当前正在执行的任务 key 列表
└──────────┬───────────┘ │
│ 有任务 │
▼ │
+ ┌──────────────────────────────────────┐ │
+ │ 4. 遍历到期任务 (协程并发) │ │
+ │ - 启动主锁续期协程 │ │
+ │ - 通过 Channel 控制最大并发数 │ │
+ │ - 获取任务锁 │ │
+ │ - 分发 OnTaskBeforeExecute 事件 │ │
+ │ - 执行任务 handle() │ │
+ │ - 分发 OnTaskExecuted/OnTaskFailed │ │
+ │ - 计算下次执行时间 │ │
+ │ - 更新 ZSET 分数 │ │
+ │ - 停止锁续期协程 │ │
+ └──────────────────────────────────────┘ │
+ │ │
+ ▼ │
┌──────────────────────┐ │
- │ 4. 遍历到期任务 │ │
- │ - 获取任务锁 │ │
- │ - 执行任务 handle() │ │
- │ - 记录日志/更新状态 │◀─────────────────────┘
- │ - 计算下次执行时间 │
- │ - 更新 ZSET 分数 │
- └──────────┬───────────┘
- │
- ▼
- ┌──────────────────────┐
- │ 5. sleep(1) 等待下个 │
+ │ 5. sleep(1) 等待下个 │◀─────────────────────┘
│ tick │
└──────────────────────┘
```
@@ -183,11 +212,31 @@ Member: 当前正在执行的任务 key 列表
active ──▶ paused ──▶ active (暂停/恢复)
active ──▶ disabled (禁用,从 ZSET 移除)
一次性任务执行完成后: 从 ZSET 和 Redis 中移除
+ 任务从配置中移除: 启动时自动清理 (幽灵任务清理)
```
-## 七、与 kiri-core 集成方式
+## 七、事件系统
-### 7.1 作为 Composer 包引入
+调度器通过 kiri-core PSR-14 事件系统在任务执行的各个阶段分发事件:
+
+| 事件 | 触发时机 | 参数 |
+|------|---------|------|
+| `OnTaskBeforeExecute` | 任务执行前 | taskKey, className, taskName |
+| `OnTaskExecuted` | 任务执行成功 | taskKey, className, taskName, duration, nextRun |
+| `OnTaskFailed` | 任务执行失败 | taskKey, className, taskName, error, duration, nextRun |
+
+监听器注册方式(config/events.php):
+```php
+return [
+ \Kiri\Crontab\Events\OnTaskFailed::class => [
+ [App\Listener\CrontabAlertListener::class, 'process'],
+ ],
+];
+```
+
+## 八、与 kiri-core 集成方式
+
+### 8.1 作为 Composer 包引入
```json
{
"require": {
@@ -196,22 +245,32 @@ Member: 当前正在执行的任务 key 列表
}
```
-### 7.2 通过 Provider 注册到框架
+### 8.2 通过 Provider 注册命令
+CrontabProviders 自动将 `sw:crontab` 命令注册到 Console Application。
+
+### 8.3 通过 Process 集成
```php
// config/servers.php 中添加
'process' => [
\Kiri\Crontab\CrontabProcess::class,
],
```
+CrontabProcess 继承 AbstractProcess,支持框架的生命周期管理(信号处理、优雅退出)。
-### 7.3 独立运行模式
+### 8.4 独立运行模式
```bash
php bin/crontab start
php bin/crontab stop
php bin/crontab restart
+php bin/crontab status
```
-### 7.4 注解驱动注册方式
+### 8.5 kiri-core 控制台模式
+```bash
+php kiri.php sw:crontab start|stop|restart|status
+```
+
+### 8.6 注解驱动注册方式
```php
use Kiri\Crontab\Annotate\Crontab;
use Kiri\Crontab\TaskInterface;
@@ -226,70 +285,75 @@ class CleanLogTask implements TaskInterface
}
```
-在配置中指定扫描路径:
-```php
-// config/crontab.php
-'scan_paths' => [
- 'app/Task',
- 'app/Crontab',
-],
-```
+任务类实现 TaskInterface 并使用 `#[Crontab]` 注解后,CrontabProcess 启动时自动通过 `get_declared_classes()` 发现并注册。
-## 八、注解扫描流程
+## 九、注解扫描流程
```
┌──────────────────────────────────────┐
-│ CrontabScanner::scan($directory) │
+│ CrontabProcess::discoverAnnotationTasks() │
└────────────────┬─────────────────────┘
│
▼
┌──────────────────────────────────────┐
-│ 1. 递归遍历目录下所有 PHP 文件 │
-│ 跳过 vendor/tests/cache/storage │
+│ 1. 遍历 get_declared_classes() │
+│ 遍历所有已加载的 PHP 类 │
└────────────────┬─────────────────────┘
│
▼
┌──────────────────────────────────────┐
-│ 2. require_once 加载文件 │
-│ get_declared_classes() 获取新类 │
-└────────────────┬─────────────────────┘
- │
- ▼
-┌──────────────────────────────────────┐
-│ 3. 检查类是否 implements TaskInterface│
+│ 2. 检查类是否 implements TaskInterface│
│ ──否──▶ 跳过 │
└────────────────┬─────────────────────┘
│ 是
▼
┌──────────────────────────────────────┐
-│ 4. ReflectionClass::getAttributes( │
+│ 3. ReflectionClass::getAttributes( │
│ Crontab::class) 读取注解 │
│ ──无──▶ 跳过 │
└────────────────┬─────────────────────┘
│ 有
▼
┌──────────────────────────────────────┐
-│ 5. $registry->register([ │
-│ 'class' => $className, │
-│ 'name' => $crontab->name, │
-│ 'expression' => $crontab->expr, │
-│ 'status' => $crontab->status, │
+│ 4. 调用 buildExpression() 生成表达式 │
+│ 验证表达式非空 │
+│ ──空──▶ 跳过 │
+└────────────────┬─────────────────────┘
+ │ 有效
+ ▼
+┌──────────────────────────────────────┐
+│ 5. TaskRegistry::register([ │
+│ 'class' => $className, │
+│ 'name' => $crontab->name, │
+│ 'expression' => $expr, │
+│ 'status' => $crontab->status│
│ ]); │
└──────────────────────────────────────┘
```
-## 九、依赖关系
+## 十、依赖关系
```
kiri-crontab
├── PHP >= 8.5
-├── ext-swoole (协程/进程)
-├── ext-redis (Redis 客户端)
-├── psr/log (PSR-3 日志)
-└── symfony/console (可选,如集成 kiri-core 则复用项目已有的)
+├── ext-swoole (协程/进程)
+├── ext-redis (Redis 客户端)
+├── psr/log (PSR-3 日志)
+├── psr/event-dispatcher (PSR-14 事件)
+└── symfony/console (命令行集成)
```
-## 十、目录结构
+与 kiri-core 框架的关系:
+```
+kiri-crontab (外挂包)
+├── 继承 Kiri\Abstracts\Component (CrontabScheduler)
+├── 继承 Kiri\Abstracts\Providers (CrontabProviders)
+├── 继承 Kiri\Server\Processes\AbstractProcess (CrontabProcess)
+├── 使用 Kiri\Events\EventDispatch (事件分发)
+└── 集成 Symfony\Component\Console\Command\Command
+```
+
+## 十一、目录结构
```
kiri-crontab/
@@ -304,12 +368,12 @@ kiri-crontab/
│ ├── TaskInterface.php # 任务接口
│ ├── TaskConfig.php # 任务配置值对象
│ ├── TaskRegistry.php # 任务注册中心
-│ ├── CrontabScheduler.php # 核心调度引擎
-│ ├── CrontabProcess.php # Swoole 进程适配器
-│ ├── CrontabCommand.php # 控制台命令
-│ ├── CrontabScanner.php # 注解任务扫描器
+│ ├── CrontabScheduler.php # 核心调度引擎 (继承 Component)
+│ ├── CrontabProcess.php # Swoole 进程适配器 (继承 AbstractProcess)
+│ ├── CrontabCommand.php # Symfony 控制台命令
│ ├── CrontabProviders.php # kiri-core Provider 集成
│ ├── CronExpression.php # Cron 表达式解析器
+│ ├── functions.php # 全局辅助函数
│ ├── Annotate/
│ │ └── Crontab.php # #[Crontab] 注解类
│ └── Events/
diff --git a/bin/crontab b/bin/crontab
index eef96bd..7f84faa 100644
--- a/bin/crontab
+++ b/bin/crontab
@@ -11,10 +11,9 @@
*
* 任务定义方式:
* 1. 配置文件: config/crontab.php 的 tasks 中声明
- * 2. 注解自动扫描: 在任务类上使用 #[Crontab] 注解,配置 scan_paths 目录
+ * 2. 注解自动扫描: 在任务类上使用 #[Crontab] 注解
*
- * 确保项目根目录有 config/crontab.php 配置文件
- * 并在其中定义 Redis 连接 (redis)、调度参数 (scheduler) 和扫描路径 (scan_paths)
+ * CrontabProcess 启动时会自动完成配置任务注册和注解扫描
*/
declare(strict_types=1);
@@ -41,14 +40,16 @@ if ($autoloadPath === null) {
require $autoloadPath;
use Kiri\Crontab\CrontabCommand;
-use Kiri\Crontab\TaskRegistry;
+use Symfony\Component\Console\Application;
+use Symfony\Component\Console\Input\ArgvInput;
+use Symfony\Component\Console\Output\ConsoleOutput;
// 解析命令行参数
$args = $argv;
$script = array_shift($args);
$action = array_shift($args) ?: 'status';
-// 查找配置文件
+// 查找配置文件,获取 PID 文件路径
$configFiles = [
getcwd() . '/config/crontab.php',
__DIR__ . '/../config/crontab.php',
@@ -73,60 +74,19 @@ if (!is_array($config)) {
exit(1);
}
-// 初始化任务注册中心 (静态注册表)
-// 1. 注册配置文件中的任务
-$taskList = $config['tasks'] ?? [];
-foreach ($taskList as $taskConfig) {
- try {
- TaskRegistry::register($taskConfig);
- } catch (\Throwable $throwable) {
- fwrite(STDERR, "任务注册失败: {$throwable->getMessage()}" . PHP_EOL);
- }
-}
+// PID 文件路径
+$pidFile = $config['scheduler']['pid_file'] ?? (getcwd() . '/storage/crontab.pid');
-// 2. 发现注解任务 (检查类上的 #[Crontab] 注解)
-foreach (get_declared_classes() as $className) {
- try {
- $reflect = new ReflectionClass($className);
- if ($reflect->isAbstract()) continue;
- if (!in_array(Kiri\Crontab\TaskInterface::class, class_implements($className), true)) continue;
+// 创建 Console Application 并执行
+$application = new Application('kiri-crontab', '1.0.0');
+$application->setAutoExit(false);
- $attributes = $reflect->getAttributes(Kiri\Crontab\Annotate\Crontab::class);
- if (empty($attributes)) continue;
+$command = new CrontabCommand($pidFile);
+$command->setName('crontab');
+$application->add($command);
- $instance = $attributes[0]->newInstance();
- $expr = $instance->buildExpression();
- if ($expr === '') continue;
+$input = new ArgvInput(['crontab', $action]);
+$output = new ConsoleOutput();
- Kiri\Crontab\TaskRegistry::register([
- 'class' => $className,
- 'name' => $instance->name !== '' ? $instance->name : $className,
- 'expression' => $expr,
- 'status' => $instance->status,
- ]);
- } catch (\Throwable) {}
-}
-
-echo "[Crontab] 任务总数: " . TaskRegistry::count() . PHP_EOL;
-
-// PID 文件和日志文件
-$pidFile = $config['scheduler']['pid_file'] ?? (getcwd() . '/storage/crontab.pid');
-$logFile = $config['scheduler']['log_file'] ?? '';
-
-// 创建命令实例
-$command = new CrontabCommand($pidFile, $logFile);
-
-// 路由到对应操作
-try {
- match ($action) {
- 'start' => $command->start($config),
- 'stop' => $command->stop(),
- 'restart' => $command->restart($config),
- 'status' => $command->status(),
- default => fwrite(STDERR, "未知操作: {$action} (支持: start|stop|restart|status)" . PHP_EOL),
- };
-} catch (\Throwable $throwable) {
- fwrite(STDERR, "执行失败: {$throwable->getMessage()}" . PHP_EOL);
- fwrite(STDERR, "{$throwable->getTraceAsString()}" . PHP_EOL);
- exit(1);
-}
+$exitCode = $application->run($input, $output);
+exit($exitCode);
diff --git a/composer.json b/composer.json
index 6389c7b..c24cbfc 100644
--- a/composer.json
+++ b/composer.json
@@ -13,7 +13,8 @@
"php": ">=8.5",
"ext-swoole": "*",
"ext-redis": "*",
- "psr/log": "^1.0"
+ "psr/log": "^1.0",
+ "psr/event-dispatcher": "^1.0"
},
"suggest": {
"symfony/console": "如需集成 kiri-core 或使用命令行管理,建议安装 ^v8.0",
@@ -22,7 +23,10 @@
"autoload": {
"psr-4": {
"Kiri\\Crontab\\": "src/"
- }
+ },
+ "files": [
+ "src/functions.php"
+ ]
},
"bin": [
"bin/crontab"
diff --git a/src/CrontabCommand.php b/src/CrontabCommand.php
index cde32b8..a05bc55 100644
--- a/src/CrontabCommand.php
+++ b/src/CrontabCommand.php
@@ -3,24 +3,36 @@ declare(strict_types=1);
namespace Kiri\Crontab;
+use Swoole\Process;
+use Symfony\Component\Console\Command\Command;
+use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputInterface;
+use Symfony\Component\Console\Output\OutputInterface;
+
/**
- * Crontab 控制台命令 — 独立模式下的启动/停止/管理命令
+ * Crontab 控制台命令 — 管理调度器的启动/停止/重启/状态
*
- * 支持的命令:
- * php bin/crontab start 启动调度器
- * php bin/crontab stop 停止调度器
- * php bin/crontab restart 重启调度器
- * php bin/crontab status 查看调度状态
+ * 使用方式:
+ * php kiri.php sw:crontab start
+ * php kiri.php sw:crontab stop
+ * php kiri.php sw:crontab restart
+ * php kiri.php sw:crontab status
+ *
+ * 独立模式:
+ * php bin/crontab start|stop|restart|status
*/
-class CrontabCommand
+class CrontabCommand extends Command
{
+ /** @var string 命令名称 */
+ public string $command = 'sw:crontab';
+
+ /** @var string 命令描述 */
+ public string $description = '管理 crontab 调度器: start|stop|restart|status';
+
/** @var string PID 文件路径 */
private string $pidFile;
- /** @var string 日志文件路径 */
- private string $logFile;
-
/**
* @param string $pidFile PID 文件路径
* @param string $logFile 日志文件路径
@@ -29,68 +41,120 @@ class CrontabCommand
string $pidFile = '',
string $logFile = '',
) {
+ parent::__construct();
$this->pidFile = $pidFile !== '' ? $pidFile : sys_get_temp_dir() . '/crontab.pid';
- $this->logFile = $logFile;
+ }
+
+ /**
+ * 配置命令参数
+ */
+ protected function configure(): void
+ {
+ $this->setName('sw:crontab')
+ ->addArgument('action', InputArgument::OPTIONAL, 'start|stop|restart|status', 'status')
+ ->setDescription('./snowflake sw:crontab start|stop|restart|status');
+ }
+
+ /**
+ * 执行命令
+ *
+ * @param InputInterface $input 输入
+ * @param OutputInterface $output 输出
+ * @return int 退出码
+ */
+ public function execute(InputInterface $input, OutputInterface $output): int
+ {
+ $action = $input->getArgument('action');
+
+ try {
+ match ($action) {
+ 'start' => $this->start($output),
+ 'stop' => $this->stop($output),
+ 'restart' => $this->restart($output),
+ 'status' => $this->status($output),
+ default => $output->writeln("未知操作: {$action} (支持: start|stop|restart|status)"),
+ };
+ } catch (\Throwable $throwable) {
+ $output->writeln("执行失败: {$throwable->getMessage()}");
+ return Command::FAILURE;
+ }
+
+ return Command::SUCCESS;
}
/**
* 启动调度器进程
*/
- public function start(array $config): void
+ private function start(OutputInterface $output): void
{
if ($this->isRunning()) {
- echo "[Crontab] 调度器已在运行中, PID: " . $this->getPid() . PHP_EOL;
+ $output->writeln("Crontab 调度器已在运行中, PID: " . $this->getPid() . "");
return;
}
$count = TaskRegistry::count();
if ($count === 0) {
- echo "[Crontab] 警告: 没有注册任何任务" . PHP_EOL;
+ $output->writeln("Crontab 警告: 没有注册任何任务");
}
- echo "[Crontab] 启动调度器..." . PHP_EOL;
- echo "[Crontab] 已注册 {$count} 个任务" . PHP_EOL;
+ $output->writeln("Crontab 启动调度器...");
+ $output->writeln("Crontab 已注册 {$count} 个任务");
- $process = new \Swoole\Process(function (\Swoole\Process $worker) use ($config) {
- file_put_contents($this->pidFile, (string)$worker->pid);
+ $config = $this->loadConfig();
+
+ $pidFile = $this->pidFile;
+
+ $process = new Process(function (Process $worker) use ($config, $pidFile) {
+ file_put_contents($pidFile, (string)$worker->pid);
+
+ // 注册配置文件中声明的任务
+ $taskList = $config['tasks'] ?? [];
+ foreach ($taskList as $taskConfig) {
+ try {
+ TaskRegistry::register($taskConfig);
+ } catch (\Throwable $throwable) {
+ fwrite(STDERR, "任务注册失败: {$throwable->getMessage()}" . PHP_EOL);
+ }
+ }
$crontabProcess = new CrontabProcess($config);
- $crontabProcess->run();
+ $crontabProcess->onShutdown($worker)->process($worker);
}, false, 0, true);
$pid = $process->start();
- echo "[Crontab] 调度器已启动, PID: {$pid}" . PHP_EOL;
- echo "[Crontab] PID 文件: {$this->pidFile}" . PHP_EOL;
+ $output->writeln("Crontab 调度器已启动, PID: {$pid}");
+ $output->writeln("Crontab PID 文件: {$pidFile}");
- \Swoole\Process::wait();
+ Process::wait();
}
/**
* 停止调度器进程
*/
- public function stop(): void
+ private function stop(OutputInterface $output): void
{
if (!$this->isRunning()) {
- echo "[Crontab] 调度器未运行" . PHP_EOL;
+ $output->writeln("Crontab 调度器未运行");
return;
}
$pid = $this->getPid();
- if ($pid > 0 && \Swoole\Process::kill($pid, 0)) {
- \Swoole\Process::kill($pid, SIGTERM);
- echo "[Crontab] 已发送停止信号, PID: {$pid}" . PHP_EOL;
+ if ($pid > 0 && Process::kill($pid, 0)) {
+ Process::kill($pid, SIGTERM);
+ $output->writeln("Crontab 已发送停止信号, PID: {$pid}");
+ // 等待进程优雅退出,最多等待 10 秒
$timeout = 10;
- while ($timeout > 0 && \Swoole\Process::kill($pid, 0)) {
+ while ($timeout > 0 && Process::kill($pid, 0)) {
usleep(200000);
$timeout--;
}
- if (\Swoole\Process::kill($pid, 0)) {
- \Swoole\Process::kill($pid, SIGKILL);
- echo "[Crontab] 进程未响应, 已强制终止" . PHP_EOL;
+ if (Process::kill($pid, 0)) {
+ Process::kill($pid, SIGKILL);
+ $output->writeln("Crontab 进程未响应, 已强制终止");
}
}
@@ -98,34 +162,58 @@ class CrontabCommand
unlink($this->pidFile);
}
- echo "[Crontab] 调度器已停止" . PHP_EOL;
+ $output->writeln("Crontab 调度器已停止");
}
/**
* 重启调度器进程
*/
- public function restart(array $config): void
+ private function restart(OutputInterface $output): void
{
- echo "[Crontab] 重启调度器..." . PHP_EOL;
- $this->stop();
+ $output->writeln("Crontab 重启调度器...");
+ $this->stop($output);
usleep(500000);
- $this->start($config);
+ $this->start($output);
}
/**
* 查看调度器状态
*/
- public function status(): void
+ private function status(OutputInterface $output): void
{
if ($this->isRunning()) {
$pid = $this->getPid();
- echo "[Crontab] 状态: 运行中" . PHP_EOL;
- echo "[Crontab] PID: {$pid}" . PHP_EOL;
+ $output->writeln("Crontab 状态: 运行中");
+ $output->writeln("Crontab PID: {$pid}");
} else {
- echo "[Crontab] 状态: 未运行" . PHP_EOL;
+ $output->writeln("Crontab 状态: 未运行");
}
}
+ /**
+ * 加载 crontab 配置文件
+ *
+ * @return array 配置数组
+ */
+ private function loadConfig(): array
+ {
+ $configFiles = [
+ getcwd() . '/config/crontab.php',
+ dirname(__DIR__) . '/config/crontab.php',
+ ];
+
+ foreach ($configFiles as $file) {
+ if (file_exists($file)) {
+ $config = require $file;
+ if (is_array($config)) {
+ return $config;
+ }
+ }
+ }
+
+ throw new \RuntimeException('未找到配置文件 config/crontab.php');
+ }
+
/**
* 检查调度器是否在运行
*/
@@ -135,7 +223,7 @@ class CrontabCommand
if ($pid <= 0) {
return false;
}
- return \Swoole\Process::kill($pid, 0);
+ return Process::kill($pid, 0);
}
/**
diff --git a/src/CrontabProcess.php b/src/CrontabProcess.php
index 65900f1..4db485e 100644
--- a/src/CrontabProcess.php
+++ b/src/CrontabProcess.php
@@ -3,196 +3,224 @@ declare(strict_types=1);
namespace Kiri\Crontab;
+use Kiri\Server\Processes\AbstractProcess;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Swoole\Coroutine;
+use Swoole\Process;
/**
- * Crontab Swoole 进程 — 作为独立进程运行任务调度器
+ * Crontab 自定义进程 — 继承框架统一 Process 基类
*
* 两种运行模式:
- * 1. 独立模式: 直接调用 run() 启动,自建 Swoole 事件循环
- * 2. kiri-core 集成模式: 注册为自定义 Process,由 kiri-http-server 管理生命周期
+ * 1. kiri-core 集成模式: 通过 config/servers.php 的 process 数组注册
+ * → TraitProcess::genProcess 创建 Swoole\Process → onShutdown → process()
+ * 2. 独立模式: bin/crontab 直接 fork 子进程,调用 run() → process()
*
* 任务注册方式:
- * - 配置模式: config/crontab.php 的 tasks 中声明
- * - 注解模式: 任务类上使用 #[Crontab] 注解 + CrontabScanner 自动扫描
+ * - 配置模式: config/crontab.php 的 tasks 中声明 (自动注册)
+ * - 注解模式: 任务类上使用 #[Crontab] 注解 (启动时自动发现)
* 两种方式可同时使用
*
- * 独立模式示例:
- * $process = new CrontabProcess($config, $registry);
- * $process->run();
+ * kiri-core 集成示例:
+ * // config/servers.php
+ * 'process' => [
+ * \Kiri\Crontab\CrontabProcess::class,
+ * ],
*
- * kiri-core 集成模式:
- * 在 config/servers.php 的 process 中添加 CrontabProcess::class 即可
+ * 独立模式示例:
+ * php bin/crontab start
*/
-class CrontabProcess
+class CrontabProcess extends AbstractProcess
{
- private ?CrontabScheduler $scheduler = null;
+ /** @var bool 启用协程信号等待 */
+ protected bool $enable_coroutine = true;
- /**
- * @param array $config 配置数组 (crontab.php 的完整内容)
- * @param LoggerInterface|null $logger 日志记录器
- */
- public function __construct(
- private array $config,
- private ?LoggerInterface $logger = null,
- ) {
- if ($this->logger === null) {
- $this->logger = new NullLogger();
- }
- }
+ private ?CrontabScheduler $scheduler = null;
- /**
- * 扫描注解任务并启动进程 (独立模式)
- *
- * @throws \Throwable
- */
- public function run(): void
- {
- $this->logger->info('[CrontabProcess] 进程启动中...');
+ /** @var LoggerInterface|null 日志记录器 (独立模式兜底) */
+ private ?LoggerInterface $logger;
- $this->discoverAnnotationTasks();
+ /**
+ * @param array $config 配置数组 (crontab.php 的完整内容)
+ * 空数组时尝试从 kiri-core 配置系统加载
+ * @param LoggerInterface|null $logger 日志记录器
+ */
+ public function __construct(private array $config = [], ?LoggerInterface $logger = null)
+ {
+ parent::__construct();
+ $this->logger = $logger ?? new NullLogger;
- Coroutine::run(function () {
- $this->startScheduler();
- });
+ // kiri-core 集成模式下,通过配置系统加载 crontab 配置
+ if (empty($this->config) && function_exists('config')) {
+ $this->config = config('crontab', []);
+ }
+ }
- $this->logger->info('[CrontabProcess] 进程已退出');
- }
+ /**
+ * 进程名称
+ */
+ public function getName(): string
+ {
+ return "Crontab Scheduler";
+ }
- /**
- * 在 kiri-core 自定义进程中启动调度器
- * 由框架的 AbstractProcess 生命周期调用
- *
- * @throws \Throwable
- */
- public function boot(): void
- {
- $this->logger->info('[CrontabProcess] 在 kiri-core 进程内启动...');
+ /**
+ * kiri-core 集成入口 — 由 AbstractProcess::onShutdown 调用
+ * 在独立的 Swoole Process 中启动任务调度器
+ *
+ * @param Process|null $process Swoole 进程对象
+ */
+ public function process(?Process $process): void
+ {
+ $this->logger->info('[CrontabProcess] 进程启动中...');
- $this->discoverAnnotationTasks();
+ $this->registerConfigTasks();
+ $this->discoverAnnotationTasks();
+ $this->startScheduler();
- $this->startScheduler();
- }
+ $this->logger->info('[CrontabProcess] 进程已退出');
+ }
- /**
- * 获取当前调度器实例
- */
- public function getScheduler(): ?CrontabScheduler
- {
- return $this->scheduler;
- }
+ /**
+ * 收到 SIGTERM 信号时优雅停止调度器
+ */
+ public function onSigterm(): void
+ {
+ $this->logger->info('[CrontabProcess] 收到停止信号,正在关闭调度器...');
+ $this->scheduler?->stop();
+ }
- /**
- * 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry
- * CrontabProcess 启动时自动调用,检查所有已加载的类
- */
- private function discoverAnnotationTasks(): void
- {
- $beforeCount = TaskRegistry::count();
+ /**
+ * 独立模式入口 — 在 Coroutine::run 中启动调度器
+ * 供 bin/crontab 独立运行模式使用
+ *
+ * @throws \Throwable
+ */
+ public function run(): void
+ {
+ Coroutine\run(fn() => $this->process(null));
+ }
- foreach (get_declared_classes() as $className) {
- if (!in_array(TaskInterface::class, class_implements($className), true)) {
- continue;
- }
+ /**
+ * 获取当前调度器实例
+ */
+ public function getScheduler(): ?CrontabScheduler
+ {
+ return $this->scheduler;
+ }
- try {
- $reflect = new \ReflectionClass($className);
- if ($reflect->isAbstract()) {
- continue;
- }
+ /**
+ * 注册配置文件 (config/crontab.php) 中声明的任务
+ */
+ private function registerConfigTasks(): void
+ {
+ $taskList = $this->config['tasks'] ?? [];
+ foreach ($taskList as $taskConfig) {
+ try {
+ TaskRegistry::register($taskConfig);
+ } catch (\Throwable $throwable) {
+ $this->logger->warning("[CrontabProcess] 配置任务注册失败: {$throwable->getMessage()}");
+ }
+ }
+ }
- // 读取类上的 #[Crontab] 注解
- $attributes = $reflect->getAttributes(Annotate\Crontab::class);
- if (empty($attributes)) {
- continue;
- }
+ /**
+ * 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry
+ * 启动时自动调用,检查所有已加载的类
+ */
+ private function discoverAnnotationTasks(): void
+ {
+ $beforeCount = TaskRegistry::count();
- /** @var Annotate\Crontab $instance */
- $instance = $attributes[0]->newInstance();
+ foreach (get_declared_classes() as $className) {
+ if (!in_array(TaskInterface::class, class_implements($className), true)) {
+ continue;
+ }
- $scheduleExpression = $instance->buildExpression();
- if ($scheduleExpression === '') {
- continue;
- }
+ try {
+ $reflect = new \ReflectionClass($className);
+ if ($reflect->isAbstract()) {
+ continue;
+ }
- TaskRegistry::register([
- 'class' => $className,
- 'name' => $instance->name !== '' ? $instance->name : $className,
- 'expression' => $scheduleExpression,
- 'status' => $instance->status,
- ]);
- } catch (\Throwable) {
- // 跳过无法反射或注册失败的类
- }
- }
+ $attributes = $reflect->getAttributes(Annotate\Crontab::class);
+ if (empty($attributes)) {
+ continue;
+ }
- $afterCount = TaskRegistry::count();
- if ($afterCount > $beforeCount) {
- $this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个");
- }
- }
+ /** @var Annotate\Crontab $instance */
+ $instance = $attributes[0]->newInstance();
- /**
- * 初始化并启动调度器
- */
- private function startScheduler(): void
- {
- $redis = $this->createRedisConnection();
- $cronExpression = new CronExpression();
+ $scheduleExpression = $instance->buildExpression();
+ if ($scheduleExpression === '') {
+ continue;
+ }
- $schedulerConfig = $this->config['scheduler'] ?? [];
+ TaskRegistry::register([
+ 'class' => $className,
+ 'name' => $instance->name !== '' ? $instance->name : $className,
+ 'expression' => $scheduleExpression,
+ 'status' => $instance->status,
+ ]);
+ } catch (\Throwable $throwable) {
+ $this->logger->warning("[CrontabProcess] 注解扫描失败: {$className} - {$throwable->getMessage()}");
+ }
+ }
- $this->scheduler = new CrontabScheduler(
- redis: $redis,
- cronExpression: $cronExpression,
- logger: $this->logger,
- tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1),
- taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300),
- lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60),
- lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15),
- concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true),
- maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),
- );
+ $afterCount = TaskRegistry::count();
+ if ($afterCount > $beforeCount) {
+ $this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个");
+ }
+ }
- $this->scheduler->start();
- }
+ /**
+ * 初始化并启动调度器
+ */
+ private function startScheduler(): void
+ {
+ $redis = $this->createRedisConnection();
+ $cronExpression = new CronExpression;
+ $schedulerConfig = $this->config['scheduler'] ?? [];
+ $this->scheduler = new CrontabScheduler(redis: $redis, cronExpression: $cronExpression, logger: $this->logger, tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1), taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),);
- /**
- * 创建 Redis 连接
- * 在独立模式下直接创建连接,不通过 kiri-core 的连接池
- */
- private function createRedisConnection(): \Redis
- {
- $redisConfig = $this->config['redis'] ?? [];
+ $this->scheduler->start();
+ }
- $host = $redisConfig['host'] ?? '127.0.0.1';
- $port = (int)($redisConfig['port'] ?? 6379);
- $auth = $redisConfig['auth'] ?? '';
- $databases = (int)($redisConfig['databases'] ?? 0);
- $timeout = (int)($redisConfig['timeout'] ?? 30);
- $prefix = $redisConfig['prefix'] ?? '';
+ /**
+ * 创建 Redis 连接
+ * 在独立模式下直接创建连接,不通过 kiri-core 的连接池
+ */
+ private function createRedisConnection(): \Redis
+ {
+ $redisConfig = $this->config['redis'] ?? [];
- $redis = new \Redis();
- if (!$redis->connect($host, $port, $timeout)) {
- throw new \RuntimeException("Redis 连接失败: {$host}:{$port}");
- }
+ $host = $redisConfig['host'] ?? '127.0.0.1';
+ $port = (int)($redisConfig['port'] ?? 6379);
+ $auth = $redisConfig['auth'] ?? '';
+ $databases = (int)($redisConfig['databases'] ?? 0);
+ $timeout = (int)($redisConfig['timeout'] ?? 30);
+ $prefix = $redisConfig['prefix'] ?? '';
- if (!empty($auth) && !$redis->auth($auth)) {
- throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}");
- }
+ $redis = new \Redis;
+ if (!$redis->connect($host, $port, $timeout)) {
+ throw new \RuntimeException("Redis 连接失败: {$host}:{$port}");
+ }
- $redis->select($databases);
+ if (!empty($auth) && !$redis->auth($auth)) {
+ throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}");
+ }
- if (!empty($prefix)) {
- $redis->setOption(\Redis::OPT_PREFIX, $prefix);
- }
+ $redis->select($databases);
- $this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}");
+ if (!empty($prefix)) {
+ $redis->setOption(\Redis::OPT_PREFIX, $prefix);
+ }
- return $redis;
- }
+ $this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}");
+
+ return $redis;
+ }
}
diff --git a/src/CrontabProviders.php b/src/CrontabProviders.php
index 9e2bad5..de139c3 100644
--- a/src/CrontabProviders.php
+++ b/src/CrontabProviders.php
@@ -4,10 +4,12 @@ declare(strict_types=1);
namespace Kiri\Crontab;
use Kiri\Abstracts\Providers;
-use Symfony\Component\Console\Application;
+use Kiri\Application;
+use Psr\Container\ContainerExceptionInterface;
+use Psr\Container\NotFoundExceptionInterface;
/**
- * kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Symfony Console
+ * kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Console Application
*
* 在 kiri-core 项目的 config/servers.php 中配置:
* ```php
@@ -15,28 +17,21 @@ use Symfony\Component\Console\Application;
* \Kiri\Crontab\CrontabProcess::class,
* ],
* ```
- *
- * 在应用 Kernel 的 getCommands() 中添加:
- * ```php
- * public function getCommands(): array
- * {
- * return [
- * \Kiri\Crontab\CrontabCommand::class,
- * ];
- * }
- * ```
*/
class CrontabProviders extends Providers
{
/**
* 注册 CrontabCommand 到 Console Application
+ *
+ * @return void
+ * @throws ContainerExceptionInterface
+ * @throws NotFoundExceptionInterface
*/
public function onImport(): void
{
- $command = $this->container->get(CrontabCommand::class);
$console = $this->container->get(Application::class);
- $console->addCommand($command);
+ $console->command(CrontabCommand::class);
}
}
diff --git a/src/CrontabScheduler.php b/src/CrontabScheduler.php
index 126a3a2..89a0235 100644
--- a/src/CrontabScheduler.php
+++ b/src/CrontabScheduler.php
@@ -3,6 +3,10 @@ declare(strict_types=1);
namespace Kiri\Crontab;
+use Kiri\Abstracts\Component;
+use Kiri\Crontab\Events\OnTaskBeforeExecute;
+use Kiri\Crontab\Events\OnTaskExecuted;
+use Kiri\Crontab\Events\OnTaskFailed;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Swoole\Coroutine;
@@ -17,6 +21,7 @@ use Swoole\Coroutine;
* 4. 执行任务并更新下次调度时间
* 5. 支持协程并发执行多个到期任务
* 6. 支持运行时动态投递和取消任务
+ * 7. 通过 kiri-core 事件系统分发任务生命周期事件
*
* Redis 数据结构:
* crontab:queue — ZSET, score=下次执行时间戳, member=taskKey
@@ -25,7 +30,7 @@ use Swoole\Coroutine;
* crontab:lock:task:{key} — String, 任务执行锁
* crontab:running — SET, 当前执行中的任务
*/
-class CrontabScheduler
+class CrontabScheduler extends Component
{
/** @var string Redis key 前缀 */
@@ -49,19 +54,24 @@ class CrontabScheduler
/** @var int 默认主锁 TTL (秒) */
private const DEFAULT_LOCK_TTL = 60;
- private bool $running = false;
+ /** @var int 每次拉取到期任务的最大数量 */
+ private const MAX_DUE_TASKS_PER_TICK = 100;
- /** @var string|null 当前正在执行的任务 key (协程安全) */
- private ?string $currentTaskKey = null;
+ private bool $running = false;
/** @var self|null 全局实例引用,供任务内部访问 */
private static ?self $instance = null;
+ /** @var LoggerInterface|null 兜底日志,当容器未初始化时使用 */
+ private ?LoggerInterface $fallbackLogger;
+
+ /** @var string|null 非协程环境下当前任务 key 回退存储 */
+ private ?string $fallbackCurrentTaskKey = null;
+
/**
* @param \Redis $redis Redis 客户端
- * @param TaskRegistry $registry 任务注册中心
* @param CronExpression $cronExpression Cron 表达式解析器
- * @param LoggerInterface $logger 日志记录器
+ * @param LoggerInterface|null $logger 日志记录器(容器不可用时的兜底)
* @param int $tickInterval tick 间隔 (秒)
* @param int $taskTimeout 任务执行超时 (秒)
* @param int $lockTtl 主锁 TTL (秒)
@@ -72,7 +82,7 @@ class CrontabScheduler
public function __construct(
private \Redis $redis,
private CronExpression $cronExpression,
- private LoggerInterface $logger = new NullLogger(),
+ ?LoggerInterface $logger = null,
private int $tickInterval = self::DEFAULT_TICK_INTERVAL,
private int $taskTimeout = self::DEFAULT_TASK_TIMEOUT,
private int $lockTtl = self::DEFAULT_LOCK_TTL,
@@ -80,6 +90,8 @@ class CrontabScheduler
private bool $concurrentTasks = true,
private int $maxConcurrent = 10,
) {
+ parent::__construct();
+ $this->fallbackLogger = $logger;
self::$instance = $this;
}
@@ -102,7 +114,7 @@ class CrontabScheduler
{
$this->running = true;
- $this->logger->info('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务');
+ $this->logInfo('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务');
$this->syncTasks();
@@ -125,12 +137,12 @@ class CrontabScheduler
try {
$this->tick();
} catch (\Throwable $throwable) {
- $this->logger->error('[CrontabScheduler] tick 异常: ' . $throwable->getMessage());
+ $this->logError('[CrontabScheduler] tick 异常: ' . $throwable->getMessage());
}
}
self::$instance = null;
- $this->logger->info('[CrontabScheduler] 调度器已停止');
+ $this->logInfo('[CrontabScheduler] 调度器已停止');
}
/**
@@ -188,7 +200,7 @@ class CrontabScheduler
$this->persistNewTask($taskConfig);
- $this->logger->info("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'");
+ $this->logInfo("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'");
return $taskKey;
}
@@ -211,7 +223,7 @@ class CrontabScheduler
$this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey));
- $this->logger->info("[CrontabScheduler] 任务已取消: {$taskKey}");
+ $this->logInfo("[CrontabScheduler] 任务已取消: {$taskKey}");
return true;
}
@@ -231,17 +243,37 @@ class CrontabScheduler
*/
public function cancelCurrentTask(): void
{
- if ($this->currentTaskKey !== null) {
- $this->markTaskForRemoval($this->currentTaskKey);
+ $taskKey = $this->getCurrentTaskKey();
+ if ($taskKey !== null) {
+ $this->markTaskForRemoval($taskKey);
}
}
/**
* 获取当前正在执行的任务 key (供任务内部使用)
+ * 协程环境通过 Swoole Coroutine Context 实现协程安全
+ * 非协程环境回退到实例属性
*/
public function getCurrentTaskKey(): ?string
{
- return $this->currentTaskKey;
+ $context = Coroutine::getContext();
+ if ($context === null) {
+ return $this->fallbackCurrentTaskKey;
+ }
+ return $context['crontab_current_task_key'] ?? null;
+ }
+
+ /**
+ * 设置当前协程正在执行的任务 key
+ */
+ private function setCurrentTaskKey(?string $taskKey): void
+ {
+ $context = Coroutine::getContext();
+ if ($context === null) {
+ $this->fallbackCurrentTaskKey = $taskKey;
+ return;
+ }
+ $context['crontab_current_task_key'] = $taskKey;
}
// ──────────────────────────────────────────────
@@ -250,11 +282,15 @@ class CrontabScheduler
/**
* 同步任务到 Redis
+ * 将 Registry 中的任务写入 Redis,同时清理 Redis 中已不在 Registry 的幽灵任务
*/
public function syncTasks(): void
{
+ $registryTaskKeys = [];
+
foreach (TaskRegistry::all() as $taskKey => $taskConfig) {
$hashKey = $this->getTaskHashKey($taskKey);
+ $registryTaskKeys[] = $hashKey;
if (!$this->redis->exists($hashKey)) {
$this->persistNewTask($taskConfig);
@@ -263,6 +299,41 @@ class CrontabScheduler
$this->mergeExistingTask($taskConfig);
}
+
+ $this->cleanOrphanedTasks($registryTaskKeys);
+ }
+
+ /**
+ * 清理 Redis 中已不在 Registry 的幽灵任务
+ *
+ * @param array $activeHashKeys 当前 Registry 中存在的 hash key 列表
+ */
+ private function cleanOrphanedTasks(array $activeHashKeys): void
+ {
+ $activeHashKeyMap = array_flip($activeHashKeys);
+
+ $redisTaskKeys = $this->redis->keys(self::KEY_PREFIX . ':task:*');
+ if (empty($redisTaskKeys)) {
+ return;
+ }
+
+ $removedCount = 0;
+ foreach ($redisTaskKeys as $hashKey) {
+ if (isset($activeHashKeyMap[$hashKey])) {
+ continue;
+ }
+
+ $taskKey = substr($hashKey, strlen(self::KEY_PREFIX . ':task:'));
+ $this->removeFromQueue($taskKey);
+ $this->redis->del($hashKey);
+ $this->redis->del($this->getTaskLockKey($taskKey));
+ $this->redis->del('crontab:removal:' . $taskKey);
+ $removedCount++;
+ }
+
+ if ($removedCount > 0) {
+ $this->logInfo("[CrontabScheduler] 清理幽灵任务 {$removedCount} 个");
+ }
}
/**
@@ -301,7 +372,7 @@ class CrontabScheduler
$this->redis->hSet($hashKey, 'status', 'paused');
$this->redis->zRem(self::QUEUE_KEY, $taskKey);
- $this->logger->info("[CrontabScheduler] 任务已暂停: {$taskKey}");
+ $this->logInfo("[CrontabScheduler] 任务已暂停: {$taskKey}");
return true;
}
@@ -328,7 +399,7 @@ class CrontabScheduler
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey);
}
- $this->logger->info("[CrontabScheduler] 任务已恢复: {$taskKey}");
+ $this->logInfo("[CrontabScheduler] 任务已恢复: {$taskKey}");
return true;
}
@@ -393,15 +464,17 @@ class CrontabScheduler
{
$now = time();
- $dueTaskKeys = $this->redis->zRangeByScore(self::QUEUE_KEY, '-inf', $now, ['limit' => [0, 100]]);
+ $dueTaskKeys = $this->redis->zRangeByScore(
+ self::QUEUE_KEY,
+ '-inf',
+ $now,
+ ['limit' => [0, self::MAX_DUE_TASKS_PER_TICK]]
+ );
if (empty($dueTaskKeys)) {
- $this->renewMasterLock();
return;
}
- $this->renewMasterLock();
-
if ($this->concurrentTasks) {
$this->executeTasksConcurrently($dueTaskKeys, $now);
} else {
@@ -413,13 +486,34 @@ class CrontabScheduler
/**
* 并发执行到期任务 (通过协程)
+ * 在执行期间持续续期主锁,防止并发任务耗时超过锁 TTL
+ * 非协程环境下自动回退到顺序执行
*/
private function executeTasksConcurrently(array $taskKeys, int $now): void
{
+ // 非协程环境回退到顺序执行,避免死锁
+ if (Coroutine::getCid() <= 0) {
+ foreach ($taskKeys as $taskKey) {
+ $this->executeSingleTask($taskKey, $now);
+ }
+ return;
+ }
+
$batchSize = min(count($taskKeys), $this->maxConcurrent);
$channel = new Coroutine\Channel($batchSize);
+ // 启动主锁续期协程,在并发任务执行期间持续续期
+ $stopRenewal = false;
+ Coroutine::create(function () use (&$stopRenewal) {
+ while (!$stopRenewal) {
+ Coroutine::sleep($this->lockRenewInterval);
+ if (!$stopRenewal) {
+ $this->renewMasterLock();
+ }
+ }
+ });
+
foreach ($taskKeys as $taskKey) {
$channel->push(true);
@@ -427,17 +521,21 @@ class CrontabScheduler
try {
$this->executeSingleTask($taskKey, $now);
} catch (\Throwable $throwable) {
- $this->logger->error("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}");
+ $this->logError("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}");
} finally {
$channel->pop();
}
});
}
+ // 等待所有协程完成
for ($i = 0; $i < $batchSize; $i++) {
$channel->push(true);
}
$channel->close();
+
+ // 停止锁续期协程
+ $stopRenewal = true;
}
/**
@@ -461,19 +559,25 @@ class CrontabScheduler
}
if (!$this->acquireTaskLock($taskKey)) {
- $this->logger->warning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}");
+ $this->logWarning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}");
return;
}
$startTime = microtime(true);
- // 记录当前任务 key,供任务内部通过 cancelCurrentTask() 取消自身
- $this->currentTaskKey = $taskKey;
+ // 通过 Swoole Coroutine Context 记录当前任务 key,确保协程安全
+ $this->setCurrentTaskKey($taskKey);
try {
$this->redis->sAdd(self::RUNNING_SET_KEY, $taskKey);
- $this->logger->info("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})");
+ $this->logInfo("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})");
+
+ $this->dispatchEvent(new OnTaskBeforeExecute(
+ taskKey: $taskKey,
+ className: $config->className,
+ taskName: $config->name,
+ ));
$className = $config->className;
if (!class_exists($className)) {
@@ -486,11 +590,15 @@ class CrontabScheduler
$duration = round(microtime(true) - $startTime, 4);
- $this->logger->info("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s");
+ $this->logInfo("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s");
+
+ $this->finalizeTaskSuccess($config, $now, $duration);
} catch (\Throwable $throwable) {
$duration = round(microtime(true) - $startTime, 4);
- $this->logger->error("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}");
+ $this->logError("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}");
+
+ $this->finalizeTaskFailure($config, $now, $duration, $throwable);
} finally {
$this->redis->sRem(self::RUNNING_SET_KEY, $taskKey);
@@ -502,20 +610,54 @@ class CrontabScheduler
$this->redis->del($hashKey);
$this->redis->del($this->getTaskLockKey($taskKey));
$this->clearRemovalFlag($taskKey);
- $this->logger->info("[CrontabScheduler] 任务已自毁: {$taskKey}");
+ $this->logInfo("[CrontabScheduler] 任务已自毁: {$taskKey}");
} else {
- $this->finalizeTaskExecution($config, $now);
+ $this->finalizeTaskScheduling($config, $now);
}
$this->releaseTaskLock($taskKey);
- $this->currentTaskKey = null;
+ $this->setCurrentTaskKey(null);
}
}
/**
- * 完成执行后的任务状态更新
+ * 任务执行成功后的生命周期处理
*/
- private function finalizeTaskExecution(TaskConfig $config, int $now): void
+ private function finalizeTaskSuccess(TaskConfig $config, int $now, float $duration): void
+ {
+ $nextRun = $this->cronExpression->getNextRunTime($config->expression, $now);
+ $isOneShot = $this->cronExpression->isOneShot($config->expression);
+
+ $this->dispatchEvent(new OnTaskExecuted(
+ taskKey: $config->taskKey,
+ className: $config->className,
+ taskName: $config->name,
+ duration: $duration,
+ nextRun: $isOneShot ? 0 : $nextRun,
+ ));
+ }
+
+ /**
+ * 任务执行失败后的生命周期处理
+ */
+ private function finalizeTaskFailure(TaskConfig $config, int $now, float $duration, \Throwable $error): void
+ {
+ $nextRun = $this->cronExpression->getNextRunTime($config->expression, $now);
+
+ $this->dispatchEvent(new OnTaskFailed(
+ taskKey: $config->taskKey,
+ className: $config->className,
+ taskName: $config->name,
+ error: $error,
+ duration: $duration,
+ nextRun: $nextRun,
+ ));
+ }
+
+ /**
+ * 完成执行后的任务调度状态更新
+ */
+ private function finalizeTaskScheduling(TaskConfig $config, int $now): void
{
$taskKey = $config->taskKey;
$hashKey = $this->getTaskHashKey($taskKey);
@@ -525,7 +667,7 @@ class CrontabScheduler
if ($isOneShot) {
$this->removeFromQueue($taskKey);
$this->redis->del($hashKey);
- $this->logger->info("[CrontabScheduler] 一次性任务已移除: {$taskKey}");
+ $this->logInfo("[CrontabScheduler] 一次性任务已移除: {$taskKey}");
return;
}
@@ -547,10 +689,23 @@ class CrontabScheduler
}
}
+ /**
+ * 通过 kiri-core 事件系统分发事件
+ * 容器不可用时静默跳过
+ */
+ private function dispatchEvent(object $event): void
+ {
+ try {
+ $this->getDispatch()->dispatch($event);
+ } catch (\Throwable) {
+ // 容器未初始化或事件系统不可用时跳过
+ }
+ }
+
/**
* 标记任务为"下次执行后移除"
* 当任务内部调用 cancelCurrentTask() 时,不立即删除(执行中操作安全),仅标记
- * 等当前执行完成后,executeSingleTask 的 finally 块会检查此标记并清理
+ * 等当前执行完成后,finalize 块会检查此标记并清理
*/
private function markTaskForRemoval(string $taskKey): void
{
@@ -621,7 +776,7 @@ class CrontabScheduler
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey);
}
- $this->logger->info("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun));
+ $this->logInfo("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun));
}
/**
@@ -680,4 +835,41 @@ class CrontabScheduler
return self::KEY_PREFIX . ':lock:task:' . $taskKey;
}
+ // ──────────────────────────────────────────────
+ // 日志辅助 — 兼容容器未初始化的场景
+ // ──────────────────────────────────────────────
+
+ private function logInfo(string $message): void
+ {
+ $this->log('info', $message);
+ }
+
+ private function logError(string $message): void
+ {
+ $this->log('error', $message);
+ }
+
+ private function logWarning(string $message): void
+ {
+ $this->log('warning', $message);
+ }
+
+ /**
+ * 统一日志输出入口
+ * 优先使用 Component::getLogger(),容器不可用时回退到注入的 fallbackLogger
+ */
+ private function log(string $level, string $message): void
+ {
+ try {
+ $logger = $this->getLogger();
+ $logger->{$level}($message);
+ return;
+ } catch (\Throwable) {
+ }
+
+ if ($this->fallbackLogger instanceof LoggerInterface) {
+ $this->fallbackLogger->{$level}($message);
+ }
+ }
+
}