eee
This commit is contained in:
+129
-41
@@ -3,24 +3,36 @@ declare(strict_types=1);
|
||||
|
||||
namespace Kiri\Crontab;
|
||||
|
||||
use Swoole\Process;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
|
||||
/**
|
||||
* Crontab 控制台命令 — 独立模式下的启动/停止/管理命令
|
||||
* Crontab 控制台命令 — 管理调度器的启动/停止/重启/状态
|
||||
*
|
||||
* 支持的命令:
|
||||
* php bin/crontab start 启动调度器
|
||||
* php bin/crontab stop 停止调度器
|
||||
* php bin/crontab restart 重启调度器
|
||||
* php bin/crontab status 查看调度状态
|
||||
* 使用方式:
|
||||
* php kiri.php sw:crontab start
|
||||
* php kiri.php sw:crontab stop
|
||||
* php kiri.php sw:crontab restart
|
||||
* php kiri.php sw:crontab status
|
||||
*
|
||||
* 独立模式:
|
||||
* php bin/crontab start|stop|restart|status
|
||||
*/
|
||||
class CrontabCommand
|
||||
class CrontabCommand extends Command
|
||||
{
|
||||
|
||||
/** @var string 命令名称 */
|
||||
public string $command = 'sw:crontab';
|
||||
|
||||
/** @var string 命令描述 */
|
||||
public string $description = '管理 crontab 调度器: start|stop|restart|status';
|
||||
|
||||
/** @var string PID 文件路径 */
|
||||
private string $pidFile;
|
||||
|
||||
/** @var string 日志文件路径 */
|
||||
private string $logFile;
|
||||
|
||||
/**
|
||||
* @param string $pidFile PID 文件路径
|
||||
* @param string $logFile 日志文件路径
|
||||
@@ -29,68 +41,120 @@ class CrontabCommand
|
||||
string $pidFile = '',
|
||||
string $logFile = '',
|
||||
) {
|
||||
parent::__construct();
|
||||
$this->pidFile = $pidFile !== '' ? $pidFile : sys_get_temp_dir() . '/crontab.pid';
|
||||
$this->logFile = $logFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* 配置命令参数
|
||||
*/
|
||||
protected function configure(): void
|
||||
{
|
||||
$this->setName('sw:crontab')
|
||||
->addArgument('action', InputArgument::OPTIONAL, 'start|stop|restart|status', 'status')
|
||||
->setDescription('./snowflake sw:crontab start|stop|restart|status');
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行命令
|
||||
*
|
||||
* @param InputInterface $input 输入
|
||||
* @param OutputInterface $output 输出
|
||||
* @return int 退出码
|
||||
*/
|
||||
public function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
$action = $input->getArgument('action');
|
||||
|
||||
try {
|
||||
match ($action) {
|
||||
'start' => $this->start($output),
|
||||
'stop' => $this->stop($output),
|
||||
'restart' => $this->restart($output),
|
||||
'status' => $this->status($output),
|
||||
default => $output->writeln("<error>未知操作: {$action} (支持: start|stop|restart|status)</error>"),
|
||||
};
|
||||
} catch (\Throwable $throwable) {
|
||||
$output->writeln("<error>执行失败: {$throwable->getMessage()}</error>");
|
||||
return Command::FAILURE;
|
||||
}
|
||||
|
||||
return Command::SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动调度器进程
|
||||
*/
|
||||
public function start(array $config): void
|
||||
private function start(OutputInterface $output): void
|
||||
{
|
||||
if ($this->isRunning()) {
|
||||
echo "[Crontab] 调度器已在运行中, PID: " . $this->getPid() . PHP_EOL;
|
||||
$output->writeln("<info>Crontab 调度器已在运行中, PID: " . $this->getPid() . "</info>");
|
||||
return;
|
||||
}
|
||||
|
||||
$count = TaskRegistry::count();
|
||||
if ($count === 0) {
|
||||
echo "[Crontab] 警告: 没有注册任何任务" . PHP_EOL;
|
||||
$output->writeln("<comment>Crontab 警告: 没有注册任何任务</comment>");
|
||||
}
|
||||
|
||||
echo "[Crontab] 启动调度器..." . PHP_EOL;
|
||||
echo "[Crontab] 已注册 {$count} 个任务" . PHP_EOL;
|
||||
$output->writeln("<info>Crontab 启动调度器...</info>");
|
||||
$output->writeln("<info>Crontab 已注册 {$count} 个任务</info>");
|
||||
|
||||
$process = new \Swoole\Process(function (\Swoole\Process $worker) use ($config) {
|
||||
file_put_contents($this->pidFile, (string)$worker->pid);
|
||||
$config = $this->loadConfig();
|
||||
|
||||
$pidFile = $this->pidFile;
|
||||
|
||||
$process = new Process(function (Process $worker) use ($config, $pidFile) {
|
||||
file_put_contents($pidFile, (string)$worker->pid);
|
||||
|
||||
// 注册配置文件中声明的任务
|
||||
$taskList = $config['tasks'] ?? [];
|
||||
foreach ($taskList as $taskConfig) {
|
||||
try {
|
||||
TaskRegistry::register($taskConfig);
|
||||
} catch (\Throwable $throwable) {
|
||||
fwrite(STDERR, "任务注册失败: {$throwable->getMessage()}" . PHP_EOL);
|
||||
}
|
||||
}
|
||||
|
||||
$crontabProcess = new CrontabProcess($config);
|
||||
$crontabProcess->run();
|
||||
$crontabProcess->onShutdown($worker)->process($worker);
|
||||
}, false, 0, true);
|
||||
|
||||
$pid = $process->start();
|
||||
|
||||
echo "[Crontab] 调度器已启动, PID: {$pid}" . PHP_EOL;
|
||||
echo "[Crontab] PID 文件: {$this->pidFile}" . PHP_EOL;
|
||||
$output->writeln("<info>Crontab 调度器已启动, PID: {$pid}</info>");
|
||||
$output->writeln("<info>Crontab PID 文件: {$pidFile}</info>");
|
||||
|
||||
\Swoole\Process::wait();
|
||||
Process::wait();
|
||||
}
|
||||
|
||||
/**
|
||||
* 停止调度器进程
|
||||
*/
|
||||
public function stop(): void
|
||||
private function stop(OutputInterface $output): void
|
||||
{
|
||||
if (!$this->isRunning()) {
|
||||
echo "[Crontab] 调度器未运行" . PHP_EOL;
|
||||
$output->writeln("<comment>Crontab 调度器未运行</comment>");
|
||||
return;
|
||||
}
|
||||
|
||||
$pid = $this->getPid();
|
||||
|
||||
if ($pid > 0 && \Swoole\Process::kill($pid, 0)) {
|
||||
\Swoole\Process::kill($pid, SIGTERM);
|
||||
echo "[Crontab] 已发送停止信号, PID: {$pid}" . PHP_EOL;
|
||||
if ($pid > 0 && Process::kill($pid, 0)) {
|
||||
Process::kill($pid, SIGTERM);
|
||||
$output->writeln("<info>Crontab 已发送停止信号, PID: {$pid}</info>");
|
||||
|
||||
// 等待进程优雅退出,最多等待 10 秒
|
||||
$timeout = 10;
|
||||
while ($timeout > 0 && \Swoole\Process::kill($pid, 0)) {
|
||||
while ($timeout > 0 && Process::kill($pid, 0)) {
|
||||
usleep(200000);
|
||||
$timeout--;
|
||||
}
|
||||
|
||||
if (\Swoole\Process::kill($pid, 0)) {
|
||||
\Swoole\Process::kill($pid, SIGKILL);
|
||||
echo "[Crontab] 进程未响应, 已强制终止" . PHP_EOL;
|
||||
if (Process::kill($pid, 0)) {
|
||||
Process::kill($pid, SIGKILL);
|
||||
$output->writeln("<comment>Crontab 进程未响应, 已强制终止</comment>");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,34 +162,58 @@ class CrontabCommand
|
||||
unlink($this->pidFile);
|
||||
}
|
||||
|
||||
echo "[Crontab] 调度器已停止" . PHP_EOL;
|
||||
$output->writeln("<info>Crontab 调度器已停止</info>");
|
||||
}
|
||||
|
||||
/**
|
||||
* 重启调度器进程
|
||||
*/
|
||||
public function restart(array $config): void
|
||||
private function restart(OutputInterface $output): void
|
||||
{
|
||||
echo "[Crontab] 重启调度器..." . PHP_EOL;
|
||||
$this->stop();
|
||||
$output->writeln("<info>Crontab 重启调度器...</info>");
|
||||
$this->stop($output);
|
||||
usleep(500000);
|
||||
$this->start($config);
|
||||
$this->start($output);
|
||||
}
|
||||
|
||||
/**
|
||||
* 查看调度器状态
|
||||
*/
|
||||
public function status(): void
|
||||
private function status(OutputInterface $output): void
|
||||
{
|
||||
if ($this->isRunning()) {
|
||||
$pid = $this->getPid();
|
||||
echo "[Crontab] 状态: 运行中" . PHP_EOL;
|
||||
echo "[Crontab] PID: {$pid}" . PHP_EOL;
|
||||
$output->writeln("<info>Crontab 状态: 运行中</info>");
|
||||
$output->writeln("<info>Crontab PID: {$pid}</info>");
|
||||
} else {
|
||||
echo "[Crontab] 状态: 未运行" . PHP_EOL;
|
||||
$output->writeln("<comment>Crontab 状态: 未运行</comment>");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 加载 crontab 配置文件
|
||||
*
|
||||
* @return array 配置数组
|
||||
*/
|
||||
private function loadConfig(): array
|
||||
{
|
||||
$configFiles = [
|
||||
getcwd() . '/config/crontab.php',
|
||||
dirname(__DIR__) . '/config/crontab.php',
|
||||
];
|
||||
|
||||
foreach ($configFiles as $file) {
|
||||
if (file_exists($file)) {
|
||||
$config = require $file;
|
||||
if (is_array($config)) {
|
||||
return $config;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new \RuntimeException('未找到配置文件 config/crontab.php');
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查调度器是否在运行
|
||||
*/
|
||||
@@ -135,7 +223,7 @@ class CrontabCommand
|
||||
if ($pid <= 0) {
|
||||
return false;
|
||||
}
|
||||
return \Swoole\Process::kill($pid, 0);
|
||||
return Process::kill($pid, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
+175
-147
@@ -3,196 +3,224 @@ declare(strict_types=1);
|
||||
|
||||
namespace Kiri\Crontab;
|
||||
|
||||
use Kiri\Server\Processes\AbstractProcess;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
use Swoole\Coroutine;
|
||||
use Swoole\Process;
|
||||
|
||||
/**
|
||||
* Crontab Swoole 进程 — 作为独立进程运行任务调度器
|
||||
* Crontab 自定义进程 — 继承框架统一 Process 基类
|
||||
*
|
||||
* 两种运行模式:
|
||||
* 1. 独立模式: 直接调用 run() 启动,自建 Swoole 事件循环
|
||||
* 2. kiri-core 集成模式: 注册为自定义 Process,由 kiri-http-server 管理生命周期
|
||||
* 1. kiri-core 集成模式: 通过 config/servers.php 的 process 数组注册
|
||||
* → TraitProcess::genProcess 创建 Swoole\Process → onShutdown → process()
|
||||
* 2. 独立模式: bin/crontab 直接 fork 子进程,调用 run() → process()
|
||||
*
|
||||
* 任务注册方式:
|
||||
* - 配置模式: config/crontab.php 的 tasks 中声明
|
||||
* - 注解模式: 任务类上使用 #[Crontab] 注解 + CrontabScanner 自动扫描
|
||||
* - 配置模式: config/crontab.php 的 tasks 中声明 (自动注册)
|
||||
* - 注解模式: 任务类上使用 #[Crontab] 注解 (启动时自动发现)
|
||||
* 两种方式可同时使用
|
||||
*
|
||||
* 独立模式示例:
|
||||
* $process = new CrontabProcess($config, $registry);
|
||||
* $process->run();
|
||||
* kiri-core 集成示例:
|
||||
* // config/servers.php
|
||||
* 'process' => [
|
||||
* \Kiri\Crontab\CrontabProcess::class,
|
||||
* ],
|
||||
*
|
||||
* kiri-core 集成模式:
|
||||
* 在 config/servers.php 的 process 中添加 CrontabProcess::class 即可
|
||||
* 独立模式示例:
|
||||
* php bin/crontab start
|
||||
*/
|
||||
class CrontabProcess
|
||||
class CrontabProcess extends AbstractProcess
|
||||
{
|
||||
|
||||
private ?CrontabScheduler $scheduler = null;
|
||||
/** @var bool 启用协程信号等待 */
|
||||
protected bool $enable_coroutine = true;
|
||||
|
||||
/**
|
||||
* @param array $config 配置数组 (crontab.php 的完整内容)
|
||||
* @param LoggerInterface|null $logger 日志记录器
|
||||
*/
|
||||
public function __construct(
|
||||
private array $config,
|
||||
private ?LoggerInterface $logger = null,
|
||||
) {
|
||||
if ($this->logger === null) {
|
||||
$this->logger = new NullLogger();
|
||||
}
|
||||
}
|
||||
private ?CrontabScheduler $scheduler = null;
|
||||
|
||||
/**
|
||||
* 扫描注解任务并启动进程 (独立模式)
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function run(): void
|
||||
{
|
||||
$this->logger->info('[CrontabProcess] 进程启动中...');
|
||||
/** @var LoggerInterface|null 日志记录器 (独立模式兜底) */
|
||||
private ?LoggerInterface $logger;
|
||||
|
||||
$this->discoverAnnotationTasks();
|
||||
/**
|
||||
* @param array $config 配置数组 (crontab.php 的完整内容)
|
||||
* 空数组时尝试从 kiri-core 配置系统加载
|
||||
* @param LoggerInterface|null $logger 日志记录器
|
||||
*/
|
||||
public function __construct(private array $config = [], ?LoggerInterface $logger = null)
|
||||
{
|
||||
parent::__construct();
|
||||
$this->logger = $logger ?? new NullLogger;
|
||||
|
||||
Coroutine::run(function () {
|
||||
$this->startScheduler();
|
||||
});
|
||||
// kiri-core 集成模式下,通过配置系统加载 crontab 配置
|
||||
if (empty($this->config) && function_exists('config')) {
|
||||
$this->config = config('crontab', []);
|
||||
}
|
||||
}
|
||||
|
||||
$this->logger->info('[CrontabProcess] 进程已退出');
|
||||
}
|
||||
/**
|
||||
* 进程名称
|
||||
*/
|
||||
public function getName(): string
|
||||
{
|
||||
return "Crontab Scheduler";
|
||||
}
|
||||
|
||||
/**
|
||||
* 在 kiri-core 自定义进程中启动调度器
|
||||
* 由框架的 AbstractProcess 生命周期调用
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function boot(): void
|
||||
{
|
||||
$this->logger->info('[CrontabProcess] 在 kiri-core 进程内启动...');
|
||||
/**
|
||||
* kiri-core 集成入口 — 由 AbstractProcess::onShutdown 调用
|
||||
* 在独立的 Swoole Process 中启动任务调度器
|
||||
*
|
||||
* @param Process|null $process Swoole 进程对象
|
||||
*/
|
||||
public function process(?Process $process): void
|
||||
{
|
||||
$this->logger->info('[CrontabProcess] 进程启动中...');
|
||||
|
||||
$this->discoverAnnotationTasks();
|
||||
$this->registerConfigTasks();
|
||||
$this->discoverAnnotationTasks();
|
||||
$this->startScheduler();
|
||||
|
||||
$this->startScheduler();
|
||||
}
|
||||
$this->logger->info('[CrontabProcess] 进程已退出');
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前调度器实例
|
||||
*/
|
||||
public function getScheduler(): ?CrontabScheduler
|
||||
{
|
||||
return $this->scheduler;
|
||||
}
|
||||
/**
|
||||
* 收到 SIGTERM 信号时优雅停止调度器
|
||||
*/
|
||||
public function onSigterm(): void
|
||||
{
|
||||
$this->logger->info('[CrontabProcess] 收到停止信号,正在关闭调度器...');
|
||||
$this->scheduler?->stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry
|
||||
* CrontabProcess 启动时自动调用,检查所有已加载的类
|
||||
*/
|
||||
private function discoverAnnotationTasks(): void
|
||||
{
|
||||
$beforeCount = TaskRegistry::count();
|
||||
/**
|
||||
* 独立模式入口 — 在 Coroutine::run 中启动调度器
|
||||
* 供 bin/crontab 独立运行模式使用
|
||||
*
|
||||
* @throws \Throwable
|
||||
*/
|
||||
public function run(): void
|
||||
{
|
||||
Coroutine\run(fn() => $this->process(null));
|
||||
}
|
||||
|
||||
foreach (get_declared_classes() as $className) {
|
||||
if (!in_array(TaskInterface::class, class_implements($className), true)) {
|
||||
continue;
|
||||
}
|
||||
/**
|
||||
* 获取当前调度器实例
|
||||
*/
|
||||
public function getScheduler(): ?CrontabScheduler
|
||||
{
|
||||
return $this->scheduler;
|
||||
}
|
||||
|
||||
try {
|
||||
$reflect = new \ReflectionClass($className);
|
||||
if ($reflect->isAbstract()) {
|
||||
continue;
|
||||
}
|
||||
/**
|
||||
* 注册配置文件 (config/crontab.php) 中声明的任务
|
||||
*/
|
||||
private function registerConfigTasks(): void
|
||||
{
|
||||
$taskList = $this->config['tasks'] ?? [];
|
||||
foreach ($taskList as $taskConfig) {
|
||||
try {
|
||||
TaskRegistry::register($taskConfig);
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->logger->warning("[CrontabProcess] 配置任务注册失败: {$throwable->getMessage()}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 读取类上的 #[Crontab] 注解
|
||||
$attributes = $reflect->getAttributes(Annotate\Crontab::class);
|
||||
if (empty($attributes)) {
|
||||
continue;
|
||||
}
|
||||
/**
|
||||
* 从已声明的类中发现 #[Crontab] 类注解并注册到 TaskRegistry
|
||||
* 启动时自动调用,检查所有已加载的类
|
||||
*/
|
||||
private function discoverAnnotationTasks(): void
|
||||
{
|
||||
$beforeCount = TaskRegistry::count();
|
||||
|
||||
/** @var Annotate\Crontab $instance */
|
||||
$instance = $attributes[0]->newInstance();
|
||||
foreach (get_declared_classes() as $className) {
|
||||
if (!in_array(TaskInterface::class, class_implements($className), true)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$scheduleExpression = $instance->buildExpression();
|
||||
if ($scheduleExpression === '') {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
$reflect = new \ReflectionClass($className);
|
||||
if ($reflect->isAbstract()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TaskRegistry::register([
|
||||
'class' => $className,
|
||||
'name' => $instance->name !== '' ? $instance->name : $className,
|
||||
'expression' => $scheduleExpression,
|
||||
'status' => $instance->status,
|
||||
]);
|
||||
} catch (\Throwable) {
|
||||
// 跳过无法反射或注册失败的类
|
||||
}
|
||||
}
|
||||
$attributes = $reflect->getAttributes(Annotate\Crontab::class);
|
||||
if (empty($attributes)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$afterCount = TaskRegistry::count();
|
||||
if ($afterCount > $beforeCount) {
|
||||
$this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个");
|
||||
}
|
||||
}
|
||||
/** @var Annotate\Crontab $instance */
|
||||
$instance = $attributes[0]->newInstance();
|
||||
|
||||
/**
|
||||
* 初始化并启动调度器
|
||||
*/
|
||||
private function startScheduler(): void
|
||||
{
|
||||
$redis = $this->createRedisConnection();
|
||||
$cronExpression = new CronExpression();
|
||||
$scheduleExpression = $instance->buildExpression();
|
||||
if ($scheduleExpression === '') {
|
||||
continue;
|
||||
}
|
||||
|
||||
$schedulerConfig = $this->config['scheduler'] ?? [];
|
||||
TaskRegistry::register([
|
||||
'class' => $className,
|
||||
'name' => $instance->name !== '' ? $instance->name : $className,
|
||||
'expression' => $scheduleExpression,
|
||||
'status' => $instance->status,
|
||||
]);
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->logger->warning("[CrontabProcess] 注解扫描失败: {$className} - {$throwable->getMessage()}");
|
||||
}
|
||||
}
|
||||
|
||||
$this->scheduler = new CrontabScheduler(
|
||||
redis: $redis,
|
||||
cronExpression: $cronExpression,
|
||||
logger: $this->logger,
|
||||
tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1),
|
||||
taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300),
|
||||
lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60),
|
||||
lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15),
|
||||
concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true),
|
||||
maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),
|
||||
);
|
||||
$afterCount = TaskRegistry::count();
|
||||
if ($afterCount > $beforeCount) {
|
||||
$this->logger->info("[CrontabProcess] 发现注解任务 " . ($afterCount - $beforeCount) . " 个");
|
||||
}
|
||||
}
|
||||
|
||||
$this->scheduler->start();
|
||||
}
|
||||
/**
|
||||
* 初始化并启动调度器
|
||||
*/
|
||||
private function startScheduler(): void
|
||||
{
|
||||
$redis = $this->createRedisConnection();
|
||||
$cronExpression = new CronExpression;
|
||||
$schedulerConfig = $this->config['scheduler'] ?? [];
|
||||
$this->scheduler = new CrontabScheduler(redis: $redis, cronExpression: $cronExpression, logger: $this->logger, tickInterval: (int)($schedulerConfig['tick_interval'] ?? 1), taskTimeout: (int)($schedulerConfig['task_timeout'] ?? 300), lockTtl: (int)($schedulerConfig['lock_ttl'] ?? 60), lockRenewInterval: (int)($schedulerConfig['lock_renew_interval'] ?? 15), concurrentTasks: (bool)($schedulerConfig['concurrent_tasks'] ?? true), maxConcurrent: (int)($schedulerConfig['max_concurrent'] ?? 10),);
|
||||
|
||||
/**
|
||||
* 创建 Redis 连接
|
||||
* 在独立模式下直接创建连接,不通过 kiri-core 的连接池
|
||||
*/
|
||||
private function createRedisConnection(): \Redis
|
||||
{
|
||||
$redisConfig = $this->config['redis'] ?? [];
|
||||
$this->scheduler->start();
|
||||
}
|
||||
|
||||
$host = $redisConfig['host'] ?? '127.0.0.1';
|
||||
$port = (int)($redisConfig['port'] ?? 6379);
|
||||
$auth = $redisConfig['auth'] ?? '';
|
||||
$databases = (int)($redisConfig['databases'] ?? 0);
|
||||
$timeout = (int)($redisConfig['timeout'] ?? 30);
|
||||
$prefix = $redisConfig['prefix'] ?? '';
|
||||
/**
|
||||
* 创建 Redis 连接
|
||||
* 在独立模式下直接创建连接,不通过 kiri-core 的连接池
|
||||
*/
|
||||
private function createRedisConnection(): \Redis
|
||||
{
|
||||
$redisConfig = $this->config['redis'] ?? [];
|
||||
|
||||
$redis = new \Redis();
|
||||
if (!$redis->connect($host, $port, $timeout)) {
|
||||
throw new \RuntimeException("Redis 连接失败: {$host}:{$port}");
|
||||
}
|
||||
$host = $redisConfig['host'] ?? '127.0.0.1';
|
||||
$port = (int)($redisConfig['port'] ?? 6379);
|
||||
$auth = $redisConfig['auth'] ?? '';
|
||||
$databases = (int)($redisConfig['databases'] ?? 0);
|
||||
$timeout = (int)($redisConfig['timeout'] ?? 30);
|
||||
$prefix = $redisConfig['prefix'] ?? '';
|
||||
|
||||
if (!empty($auth) && !$redis->auth($auth)) {
|
||||
throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}");
|
||||
}
|
||||
$redis = new \Redis;
|
||||
if (!$redis->connect($host, $port, $timeout)) {
|
||||
throw new \RuntimeException("Redis 连接失败: {$host}:{$port}");
|
||||
}
|
||||
|
||||
$redis->select($databases);
|
||||
if (!empty($auth) && !$redis->auth($auth)) {
|
||||
throw new \RuntimeException("Redis 认证失败: {$redis->getLastError()}");
|
||||
}
|
||||
|
||||
if (!empty($prefix)) {
|
||||
$redis->setOption(\Redis::OPT_PREFIX, $prefix);
|
||||
}
|
||||
$redis->select($databases);
|
||||
|
||||
$this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}");
|
||||
if (!empty($prefix)) {
|
||||
$redis->setOption(\Redis::OPT_PREFIX, $prefix);
|
||||
}
|
||||
|
||||
return $redis;
|
||||
}
|
||||
$this->logger->info("[CrontabProcess] Redis 已连接: {$host}:{$port} DB:{$databases}");
|
||||
|
||||
return $redis;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,10 +4,12 @@ declare(strict_types=1);
|
||||
namespace Kiri\Crontab;
|
||||
|
||||
use Kiri\Abstracts\Providers;
|
||||
use Symfony\Component\Console\Application;
|
||||
use Kiri\Application;
|
||||
use Psr\Container\ContainerExceptionInterface;
|
||||
use Psr\Container\NotFoundExceptionInterface;
|
||||
|
||||
/**
|
||||
* kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Symfony Console
|
||||
* kiri-core 框架集成 Provider — 将 CrontabCommand 注册到 Console Application
|
||||
*
|
||||
* 在 kiri-core 项目的 config/servers.php 中配置:
|
||||
* ```php
|
||||
@@ -15,28 +17,21 @@ use Symfony\Component\Console\Application;
|
||||
* \Kiri\Crontab\CrontabProcess::class,
|
||||
* ],
|
||||
* ```
|
||||
*
|
||||
* 在应用 Kernel 的 getCommands() 中添加:
|
||||
* ```php
|
||||
* public function getCommands(): array
|
||||
* {
|
||||
* return [
|
||||
* \Kiri\Crontab\CrontabCommand::class,
|
||||
* ];
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
class CrontabProviders extends Providers
|
||||
{
|
||||
|
||||
/**
|
||||
* 注册 CrontabCommand 到 Console Application
|
||||
*
|
||||
* @return void
|
||||
* @throws ContainerExceptionInterface
|
||||
* @throws NotFoundExceptionInterface
|
||||
*/
|
||||
public function onImport(): void
|
||||
{
|
||||
$command = $this->container->get(CrontabCommand::class);
|
||||
$console = $this->container->get(Application::class);
|
||||
$console->addCommand($command);
|
||||
$console->command(CrontabCommand::class);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
+228
-36
@@ -3,6 +3,10 @@ declare(strict_types=1);
|
||||
|
||||
namespace Kiri\Crontab;
|
||||
|
||||
use Kiri\Abstracts\Component;
|
||||
use Kiri\Crontab\Events\OnTaskBeforeExecute;
|
||||
use Kiri\Crontab\Events\OnTaskExecuted;
|
||||
use Kiri\Crontab\Events\OnTaskFailed;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use Psr\Log\NullLogger;
|
||||
use Swoole\Coroutine;
|
||||
@@ -17,6 +21,7 @@ use Swoole\Coroutine;
|
||||
* 4. 执行任务并更新下次调度时间
|
||||
* 5. 支持协程并发执行多个到期任务
|
||||
* 6. 支持运行时动态投递和取消任务
|
||||
* 7. 通过 kiri-core 事件系统分发任务生命周期事件
|
||||
*
|
||||
* Redis 数据结构:
|
||||
* crontab:queue — ZSET, score=下次执行时间戳, member=taskKey
|
||||
@@ -25,7 +30,7 @@ use Swoole\Coroutine;
|
||||
* crontab:lock:task:{key} — String, 任务执行锁
|
||||
* crontab:running — SET, 当前执行中的任务
|
||||
*/
|
||||
class CrontabScheduler
|
||||
class CrontabScheduler extends Component
|
||||
{
|
||||
|
||||
/** @var string Redis key 前缀 */
|
||||
@@ -49,19 +54,24 @@ class CrontabScheduler
|
||||
/** @var int 默认主锁 TTL (秒) */
|
||||
private const DEFAULT_LOCK_TTL = 60;
|
||||
|
||||
private bool $running = false;
|
||||
/** @var int 每次拉取到期任务的最大数量 */
|
||||
private const MAX_DUE_TASKS_PER_TICK = 100;
|
||||
|
||||
/** @var string|null 当前正在执行的任务 key (协程安全) */
|
||||
private ?string $currentTaskKey = null;
|
||||
private bool $running = false;
|
||||
|
||||
/** @var self|null 全局实例引用,供任务内部访问 */
|
||||
private static ?self $instance = null;
|
||||
|
||||
/** @var LoggerInterface|null 兜底日志,当容器未初始化时使用 */
|
||||
private ?LoggerInterface $fallbackLogger;
|
||||
|
||||
/** @var string|null 非协程环境下当前任务 key 回退存储 */
|
||||
private ?string $fallbackCurrentTaskKey = null;
|
||||
|
||||
/**
|
||||
* @param \Redis $redis Redis 客户端
|
||||
* @param TaskRegistry $registry 任务注册中心
|
||||
* @param CronExpression $cronExpression Cron 表达式解析器
|
||||
* @param LoggerInterface $logger 日志记录器
|
||||
* @param LoggerInterface|null $logger 日志记录器(容器不可用时的兜底)
|
||||
* @param int $tickInterval tick 间隔 (秒)
|
||||
* @param int $taskTimeout 任务执行超时 (秒)
|
||||
* @param int $lockTtl 主锁 TTL (秒)
|
||||
@@ -72,7 +82,7 @@ class CrontabScheduler
|
||||
public function __construct(
|
||||
private \Redis $redis,
|
||||
private CronExpression $cronExpression,
|
||||
private LoggerInterface $logger = new NullLogger(),
|
||||
?LoggerInterface $logger = null,
|
||||
private int $tickInterval = self::DEFAULT_TICK_INTERVAL,
|
||||
private int $taskTimeout = self::DEFAULT_TASK_TIMEOUT,
|
||||
private int $lockTtl = self::DEFAULT_LOCK_TTL,
|
||||
@@ -80,6 +90,8 @@ class CrontabScheduler
|
||||
private bool $concurrentTasks = true,
|
||||
private int $maxConcurrent = 10,
|
||||
) {
|
||||
parent::__construct();
|
||||
$this->fallbackLogger = $logger;
|
||||
self::$instance = $this;
|
||||
}
|
||||
|
||||
@@ -102,7 +114,7 @@ class CrontabScheduler
|
||||
{
|
||||
$this->running = true;
|
||||
|
||||
$this->logger->info('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务');
|
||||
$this->logInfo('[CrontabScheduler] 调度器启动,注册 ' . TaskRegistry::count() . ' 个任务');
|
||||
|
||||
$this->syncTasks();
|
||||
|
||||
@@ -125,12 +137,12 @@ class CrontabScheduler
|
||||
try {
|
||||
$this->tick();
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->logger->error('[CrontabScheduler] tick 异常: ' . $throwable->getMessage());
|
||||
$this->logError('[CrontabScheduler] tick 异常: ' . $throwable->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
self::$instance = null;
|
||||
$this->logger->info('[CrontabScheduler] 调度器已停止');
|
||||
$this->logInfo('[CrontabScheduler] 调度器已停止');
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -188,7 +200,7 @@ class CrontabScheduler
|
||||
|
||||
$this->persistNewTask($taskConfig);
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'");
|
||||
$this->logInfo("[CrontabScheduler] 动态投递任务: {$taskKey} '{$taskConfig->name}' '{$expression}'");
|
||||
|
||||
return $taskKey;
|
||||
}
|
||||
@@ -211,7 +223,7 @@ class CrontabScheduler
|
||||
$this->redis->del($hashKey);
|
||||
$this->redis->del($this->getTaskLockKey($taskKey));
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 任务已取消: {$taskKey}");
|
||||
$this->logInfo("[CrontabScheduler] 任务已取消: {$taskKey}");
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -231,17 +243,37 @@ class CrontabScheduler
|
||||
*/
|
||||
public function cancelCurrentTask(): void
|
||||
{
|
||||
if ($this->currentTaskKey !== null) {
|
||||
$this->markTaskForRemoval($this->currentTaskKey);
|
||||
$taskKey = $this->getCurrentTaskKey();
|
||||
if ($taskKey !== null) {
|
||||
$this->markTaskForRemoval($taskKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取当前正在执行的任务 key (供任务内部使用)
|
||||
* 协程环境通过 Swoole Coroutine Context 实现协程安全
|
||||
* 非协程环境回退到实例属性
|
||||
*/
|
||||
public function getCurrentTaskKey(): ?string
|
||||
{
|
||||
return $this->currentTaskKey;
|
||||
$context = Coroutine::getContext();
|
||||
if ($context === null) {
|
||||
return $this->fallbackCurrentTaskKey;
|
||||
}
|
||||
return $context['crontab_current_task_key'] ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 设置当前协程正在执行的任务 key
|
||||
*/
|
||||
private function setCurrentTaskKey(?string $taskKey): void
|
||||
{
|
||||
$context = Coroutine::getContext();
|
||||
if ($context === null) {
|
||||
$this->fallbackCurrentTaskKey = $taskKey;
|
||||
return;
|
||||
}
|
||||
$context['crontab_current_task_key'] = $taskKey;
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────
|
||||
@@ -250,11 +282,15 @@ class CrontabScheduler
|
||||
|
||||
/**
|
||||
* 同步任务到 Redis
|
||||
* 将 Registry 中的任务写入 Redis,同时清理 Redis 中已不在 Registry 的幽灵任务
|
||||
*/
|
||||
public function syncTasks(): void
|
||||
{
|
||||
$registryTaskKeys = [];
|
||||
|
||||
foreach (TaskRegistry::all() as $taskKey => $taskConfig) {
|
||||
$hashKey = $this->getTaskHashKey($taskKey);
|
||||
$registryTaskKeys[] = $hashKey;
|
||||
|
||||
if (!$this->redis->exists($hashKey)) {
|
||||
$this->persistNewTask($taskConfig);
|
||||
@@ -263,6 +299,41 @@ class CrontabScheduler
|
||||
|
||||
$this->mergeExistingTask($taskConfig);
|
||||
}
|
||||
|
||||
$this->cleanOrphanedTasks($registryTaskKeys);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理 Redis 中已不在 Registry 的幽灵任务
|
||||
*
|
||||
* @param array<string> $activeHashKeys 当前 Registry 中存在的 hash key 列表
|
||||
*/
|
||||
private function cleanOrphanedTasks(array $activeHashKeys): void
|
||||
{
|
||||
$activeHashKeyMap = array_flip($activeHashKeys);
|
||||
|
||||
$redisTaskKeys = $this->redis->keys(self::KEY_PREFIX . ':task:*');
|
||||
if (empty($redisTaskKeys)) {
|
||||
return;
|
||||
}
|
||||
|
||||
$removedCount = 0;
|
||||
foreach ($redisTaskKeys as $hashKey) {
|
||||
if (isset($activeHashKeyMap[$hashKey])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$taskKey = substr($hashKey, strlen(self::KEY_PREFIX . ':task:'));
|
||||
$this->removeFromQueue($taskKey);
|
||||
$this->redis->del($hashKey);
|
||||
$this->redis->del($this->getTaskLockKey($taskKey));
|
||||
$this->redis->del('crontab:removal:' . $taskKey);
|
||||
$removedCount++;
|
||||
}
|
||||
|
||||
if ($removedCount > 0) {
|
||||
$this->logInfo("[CrontabScheduler] 清理幽灵任务 {$removedCount} 个");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -301,7 +372,7 @@ class CrontabScheduler
|
||||
$this->redis->hSet($hashKey, 'status', 'paused');
|
||||
$this->redis->zRem(self::QUEUE_KEY, $taskKey);
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 任务已暂停: {$taskKey}");
|
||||
$this->logInfo("[CrontabScheduler] 任务已暂停: {$taskKey}");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -328,7 +399,7 @@ class CrontabScheduler
|
||||
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskKey);
|
||||
}
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 任务已恢复: {$taskKey}");
|
||||
$this->logInfo("[CrontabScheduler] 任务已恢复: {$taskKey}");
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -393,15 +464,17 @@ class CrontabScheduler
|
||||
{
|
||||
$now = time();
|
||||
|
||||
$dueTaskKeys = $this->redis->zRangeByScore(self::QUEUE_KEY, '-inf', $now, ['limit' => [0, 100]]);
|
||||
$dueTaskKeys = $this->redis->zRangeByScore(
|
||||
self::QUEUE_KEY,
|
||||
'-inf',
|
||||
$now,
|
||||
['limit' => [0, self::MAX_DUE_TASKS_PER_TICK]]
|
||||
);
|
||||
|
||||
if (empty($dueTaskKeys)) {
|
||||
$this->renewMasterLock();
|
||||
return;
|
||||
}
|
||||
|
||||
$this->renewMasterLock();
|
||||
|
||||
if ($this->concurrentTasks) {
|
||||
$this->executeTasksConcurrently($dueTaskKeys, $now);
|
||||
} else {
|
||||
@@ -413,13 +486,34 @@ class CrontabScheduler
|
||||
|
||||
/**
|
||||
* 并发执行到期任务 (通过协程)
|
||||
* 在执行期间持续续期主锁,防止并发任务耗时超过锁 TTL
|
||||
* 非协程环境下自动回退到顺序执行
|
||||
*/
|
||||
private function executeTasksConcurrently(array $taskKeys, int $now): void
|
||||
{
|
||||
// 非协程环境回退到顺序执行,避免死锁
|
||||
if (Coroutine::getCid() <= 0) {
|
||||
foreach ($taskKeys as $taskKey) {
|
||||
$this->executeSingleTask($taskKey, $now);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
$batchSize = min(count($taskKeys), $this->maxConcurrent);
|
||||
|
||||
$channel = new Coroutine\Channel($batchSize);
|
||||
|
||||
// 启动主锁续期协程,在并发任务执行期间持续续期
|
||||
$stopRenewal = false;
|
||||
Coroutine::create(function () use (&$stopRenewal) {
|
||||
while (!$stopRenewal) {
|
||||
Coroutine::sleep($this->lockRenewInterval);
|
||||
if (!$stopRenewal) {
|
||||
$this->renewMasterLock();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
foreach ($taskKeys as $taskKey) {
|
||||
$channel->push(true);
|
||||
|
||||
@@ -427,17 +521,21 @@ class CrontabScheduler
|
||||
try {
|
||||
$this->executeSingleTask($taskKey, $now);
|
||||
} catch (\Throwable $throwable) {
|
||||
$this->logger->error("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}");
|
||||
$this->logError("[CrontabScheduler] 并发执行异常: {$taskKey} - {$throwable->getMessage()}");
|
||||
} finally {
|
||||
$channel->pop();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// 等待所有协程完成
|
||||
for ($i = 0; $i < $batchSize; $i++) {
|
||||
$channel->push(true);
|
||||
}
|
||||
$channel->close();
|
||||
|
||||
// 停止锁续期协程
|
||||
$stopRenewal = true;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -461,19 +559,25 @@ class CrontabScheduler
|
||||
}
|
||||
|
||||
if (!$this->acquireTaskLock($taskKey)) {
|
||||
$this->logger->warning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}");
|
||||
$this->logWarning("[CrontabScheduler] 任务锁获取失败 (可能仍在执行中): {$taskKey}");
|
||||
return;
|
||||
}
|
||||
|
||||
$startTime = microtime(true);
|
||||
|
||||
// 记录当前任务 key,供任务内部通过 cancelCurrentTask() 取消自身
|
||||
$this->currentTaskKey = $taskKey;
|
||||
// 通过 Swoole Coroutine Context 记录当前任务 key,确保协程安全
|
||||
$this->setCurrentTaskKey($taskKey);
|
||||
|
||||
try {
|
||||
$this->redis->sAdd(self::RUNNING_SET_KEY, $taskKey);
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})");
|
||||
$this->logInfo("[CrontabScheduler] 开始执行: {$taskKey} ({$config->name})");
|
||||
|
||||
$this->dispatchEvent(new OnTaskBeforeExecute(
|
||||
taskKey: $taskKey,
|
||||
className: $config->className,
|
||||
taskName: $config->name,
|
||||
));
|
||||
|
||||
$className = $config->className;
|
||||
if (!class_exists($className)) {
|
||||
@@ -486,11 +590,15 @@ class CrontabScheduler
|
||||
|
||||
$duration = round(microtime(true) - $startTime, 4);
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s");
|
||||
$this->logInfo("[CrontabScheduler] 执行成功: {$taskKey} 耗时 {$duration}s");
|
||||
|
||||
$this->finalizeTaskSuccess($config, $now, $duration);
|
||||
} catch (\Throwable $throwable) {
|
||||
$duration = round(microtime(true) - $startTime, 4);
|
||||
|
||||
$this->logger->error("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}");
|
||||
$this->logError("[CrontabScheduler] 执行失败: {$taskKey} 耗时 {$duration}s 错误: {$throwable->getMessage()}");
|
||||
|
||||
$this->finalizeTaskFailure($config, $now, $duration, $throwable);
|
||||
} finally {
|
||||
$this->redis->sRem(self::RUNNING_SET_KEY, $taskKey);
|
||||
|
||||
@@ -502,20 +610,54 @@ class CrontabScheduler
|
||||
$this->redis->del($hashKey);
|
||||
$this->redis->del($this->getTaskLockKey($taskKey));
|
||||
$this->clearRemovalFlag($taskKey);
|
||||
$this->logger->info("[CrontabScheduler] 任务已自毁: {$taskKey}");
|
||||
$this->logInfo("[CrontabScheduler] 任务已自毁: {$taskKey}");
|
||||
} else {
|
||||
$this->finalizeTaskExecution($config, $now);
|
||||
$this->finalizeTaskScheduling($config, $now);
|
||||
}
|
||||
|
||||
$this->releaseTaskLock($taskKey);
|
||||
$this->currentTaskKey = null;
|
||||
$this->setCurrentTaskKey(null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 完成执行后的任务状态更新
|
||||
* 任务执行成功后的生命周期处理
|
||||
*/
|
||||
private function finalizeTaskExecution(TaskConfig $config, int $now): void
|
||||
private function finalizeTaskSuccess(TaskConfig $config, int $now, float $duration): void
|
||||
{
|
||||
$nextRun = $this->cronExpression->getNextRunTime($config->expression, $now);
|
||||
$isOneShot = $this->cronExpression->isOneShot($config->expression);
|
||||
|
||||
$this->dispatchEvent(new OnTaskExecuted(
|
||||
taskKey: $config->taskKey,
|
||||
className: $config->className,
|
||||
taskName: $config->name,
|
||||
duration: $duration,
|
||||
nextRun: $isOneShot ? 0 : $nextRun,
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* 任务执行失败后的生命周期处理
|
||||
*/
|
||||
private function finalizeTaskFailure(TaskConfig $config, int $now, float $duration, \Throwable $error): void
|
||||
{
|
||||
$nextRun = $this->cronExpression->getNextRunTime($config->expression, $now);
|
||||
|
||||
$this->dispatchEvent(new OnTaskFailed(
|
||||
taskKey: $config->taskKey,
|
||||
className: $config->className,
|
||||
taskName: $config->name,
|
||||
error: $error,
|
||||
duration: $duration,
|
||||
nextRun: $nextRun,
|
||||
));
|
||||
}
|
||||
|
||||
/**
|
||||
* 完成执行后的任务调度状态更新
|
||||
*/
|
||||
private function finalizeTaskScheduling(TaskConfig $config, int $now): void
|
||||
{
|
||||
$taskKey = $config->taskKey;
|
||||
$hashKey = $this->getTaskHashKey($taskKey);
|
||||
@@ -525,7 +667,7 @@ class CrontabScheduler
|
||||
if ($isOneShot) {
|
||||
$this->removeFromQueue($taskKey);
|
||||
$this->redis->del($hashKey);
|
||||
$this->logger->info("[CrontabScheduler] 一次性任务已移除: {$taskKey}");
|
||||
$this->logInfo("[CrontabScheduler] 一次性任务已移除: {$taskKey}");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -547,10 +689,23 @@ class CrontabScheduler
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过 kiri-core 事件系统分发事件
|
||||
* 容器不可用时静默跳过
|
||||
*/
|
||||
private function dispatchEvent(object $event): void
|
||||
{
|
||||
try {
|
||||
$this->getDispatch()->dispatch($event);
|
||||
} catch (\Throwable) {
|
||||
// 容器未初始化或事件系统不可用时跳过
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记任务为"下次执行后移除"
|
||||
* 当任务内部调用 cancelCurrentTask() 时,不立即删除(执行中操作安全),仅标记
|
||||
* 等当前执行完成后,executeSingleTask 的 finally 块会检查此标记并清理
|
||||
* 等当前执行完成后,finalize 块会检查此标记并清理
|
||||
*/
|
||||
private function markTaskForRemoval(string $taskKey): void
|
||||
{
|
||||
@@ -621,7 +776,7 @@ class CrontabScheduler
|
||||
$this->redis->zAdd(self::QUEUE_KEY, $nextRun, $taskConfig->taskKey);
|
||||
}
|
||||
|
||||
$this->logger->info("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun));
|
||||
$this->logInfo("[CrontabScheduler] 新任务已注册: {$taskConfig->taskKey} 下次执行: " . date('Y-m-d H:i:s', $nextRun));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -680,4 +835,41 @@ class CrontabScheduler
|
||||
return self::KEY_PREFIX . ':lock:task:' . $taskKey;
|
||||
}
|
||||
|
||||
// ──────────────────────────────────────────────
|
||||
// 日志辅助 — 兼容容器未初始化的场景
|
||||
// ──────────────────────────────────────────────
|
||||
|
||||
private function logInfo(string $message): void
|
||||
{
|
||||
$this->log('info', $message);
|
||||
}
|
||||
|
||||
private function logError(string $message): void
|
||||
{
|
||||
$this->log('error', $message);
|
||||
}
|
||||
|
||||
private function logWarning(string $message): void
|
||||
{
|
||||
$this->log('warning', $message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 统一日志输出入口
|
||||
* 优先使用 Component::getLogger(),容器不可用时回退到注入的 fallbackLogger
|
||||
*/
|
||||
private function log(string $level, string $message): void
|
||||
{
|
||||
try {
|
||||
$logger = $this->getLogger();
|
||||
$logger->{$level}($message);
|
||||
return;
|
||||
} catch (\Throwable) {
|
||||
}
|
||||
|
||||
if ($this->fallbackLogger instanceof LoggerInterface) {
|
||||
$this->fallbackLogger->{$level}($message);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user