$this->onKafkaWorker($swoole_unserialize), default => $this->onMessageWorker($server, $src_worker_id, $message) }; } catch (\Throwable $exception) { $this->addError($exception); } finally { fire(Event::SYSTEM_RESOURCE_RELEASES); } } /** * @param $server * @param $src_worker_id * @param $message * @throws \Exception */ private function onMessageWorker($server, $src_worker_id, $message) { fire(Event::PIPE_MESSAGE, [$server, $src_worker_id, $message]); return 'success'; } /** * @param array $message * @throws \ReflectionException * @throws \Snowflake\Exception\NotFindClassException */ private function onKafkaWorker(array $message) { [$topic, $message] = $message['body']; /** @var TaskContainer $container */ $container = Snowflake::app()->get('kafka-container'); $container->process($topic, new Struct($topic, $message)); return 'success'; } }