pid . ']'; if (!empty($prefix)) { $name .= '.Crontab zookeeper'; } return $name; } /** * @param Process $process * @throws Exception */ public function onHandler(Process $process): void { Timer::tick(300, [$this, 'loop']); } /** * @throws ConfigException * @throws Exception */ public function loop() { $redis = Kiri::app()->getRedis(); defer(fn() => $redis->release()); $range = $this->loadCarobTask($redis); foreach ($range as $value) { $this->dispatch($redis, $value); } } /** * @param Redis|\Redis $redis * @param $value * @throws Exception */ private function dispatch(Redis|\Redis $redis, $value) { try { if (empty($handler = $redis->get('crontab:' . $value))) { return; } $server = di(ServerManager::class)->getServer(); $server->sendMessage(swoole_unserialize($handler), $this->getWorker()); } catch (Throwable $exception) { logger()->addError($exception); } } /** * @return int * @throws Exception */ private function getWorker(): int { if ($this->workerNum == 0) { $server = di(ServerManager::class)->getServer(); $this->workerNum = $server->setting['worker_num'] + ($server->setting['task_worker_num'] ?? 0); } return random_int(0, $this->workerNum - 1); } /** * @param Redis|\Redis $redis * @return array */ private function loadCarobTask(Redis|\Redis $redis): array { $script = <<