diff --git a/Kafka/Kafka.php b/Kafka/Kafka.php index 5917bc36..87bd0199 100644 --- a/Kafka/Kafka.php +++ b/Kafka/Kafka.php @@ -11,6 +11,7 @@ use RdKafka\Exception; use RdKafka\KafkaConsumer; use RdKafka\TopicConf; use ReflectionException; +use Server\ServerManager; use Server\SInterface\CustomProcess; use Snowflake\Abstracts\Config; use Snowflake\Exception\ConfigException; @@ -133,13 +134,11 @@ class Kafka implements CustomProcess $container = Snowflake::app()->get('kafka-container'); $handler = $container->getConsumer($topic); - if (empty($handler)) { - return; + if (!empty($handler)) { + $data = new Struct($topic, $message); + + $server->sendMessage(new $handler($data), random_int(0, $setting - 1)); } - - $message = swoole_serialize(['action' => 'kafka', 'handler' => $handler, 'body' => [$topic, $message]]); - - $server->sendMessage($message, random_int(0, $setting - 1)); } catch (Throwable $exception) { logger()->addError($exception, 'throwable'); }