diff --git a/Kafka/Kafka.php b/Kafka/Kafka.php index a24fc111..c6423beb 100644 --- a/Kafka/Kafka.php +++ b/Kafka/Kafka.php @@ -127,14 +127,15 @@ class Kafka implements CustomProcess $setting = $server->setting['worker_num']; - /** @var KafkaProvider $container */ $container = Snowflake::app()->get('kafka-container'); $handler = $container->getConsumer($topic); if (!empty($handler)) { - $data = new Struct($topic, $message); + /** @var ConsumerInterface $data */ + $data = new $handler(); + $data->setParams(new Struct($topic, $message)); - $server->sendMessage(new $handler($data), random_int(0, $setting - 1)); + $server->sendMessage($data, random_int(0, $setting - 1)); } } catch (Throwable $exception) { logger()->addError($exception, 'throwable');