$this->onKafkaWorker($swoole_unserialize), 'crontab' => $this->onCrontabWorker($swoole_unserialize), default => $this->onMessageWorker($server, $src_worker_id, $message) }; } catch (\Throwable $exception) { $this->addError($exception); } finally { fire(Event::SYSTEM_RESOURCE_RELEASES); } } /** * @param array $message * @return string */ private function onCrontabWorker(array $message) { /** @var \Snowflake\Crontab\Crontab $crontab */ $crontab = $message['handler'] ?? null; $crontab->increment()->execute(); return 'success'; } /** * @param $server * @param $src_worker_id * @param $message * @throws \Exception */ private function onMessageWorker($server, $src_worker_id, $message) { fire(Event::PIPE_MESSAGE, [$server, $src_worker_id, $message]); return 'success'; } /** * @param array $message * @throws \ReflectionException * @throws \Snowflake\Exception\NotFindClassException */ private function onKafkaWorker(array $message) { [$topic, $rdMessage] = $message['body']; call_user_func($message['handler'], new Struct($topic, $rdMessage)); return 'success'; } }