$this->onKafkaWorker($swollen_universalize), 'crontab' => $this->onCrontabWorker($swollen_universalize), default => $this->onMessageWorker($server, $src_worker_id, $swollen_universalize) }; fire(Event::SYSTEM_RESOURCE_RELEASES); }); } /** * @param array $message * @return string * @throws Exception */ private function onCrontabWorker(array $message): string { if (empty($handler = $message['handler'] ?? null)) { throw new Exception('unknown handler'); } /** @var Crontab $handler */ $handler->increment()->execute(); return 'success'; } /** * @param $server * @param $src_worker_id * @param $message * @return string * @throws Exception */ private function onMessageWorker($server, $src_worker_id, $message): string { fire(Event::PIPE_MESSAGE, [$server, $src_worker_id, $message]); return 'success'; } /** * @param array $message * @return string */ private function onKafkaWorker(array $message): string { [$topic, $rdMessage] = $message['body']; call_user_func($message['handler'], new Struct($topic, $rdMessage)); return 'success'; } }