From d8e7b888118bee2265f16641b3b30dc5700da1cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Thu, 5 Aug 2021 16:47:59 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/ConsumerInterface.php | 7 ------- Kafka/Kafka.php | 12 +----------- Kafka/Message.php | 31 +++++++++++++++++++++++++++++++ 3 files changed, 32 insertions(+), 18 deletions(-) create mode 100644 Kafka/Message.php diff --git a/Kafka/ConsumerInterface.php b/Kafka/ConsumerInterface.php index 13dcfca4..fabcac91 100644 --- a/Kafka/ConsumerInterface.php +++ b/Kafka/ConsumerInterface.php @@ -12,13 +12,6 @@ interface ConsumerInterface { - /** - * @param Struct $struct - */ - public function setParams(Struct $struct): void; - - - /** * @return mixed */ diff --git a/Kafka/Kafka.php b/Kafka/Kafka.php index 7563075a..c2bddbd0 100644 --- a/Kafka/Kafka.php +++ b/Kafka/Kafka.php @@ -124,17 +124,7 @@ class Kafka implements CustomProcess $setting = $server->setting['worker_num']; - $container = Snowflake::app()->get('kafka-container'); - $handler = $container->getConsumer($topic); - - var_dump($container); - if (!empty($handler)) { - /** @var ConsumerInterface $data */ - $data = new $handler(); - $data->setParams(new Struct($topic, $message)); - - $server->sendMessage($data, random_int(0, $setting - 1)); - } + $server->sendMessage(new Message(new Struct($topic, $message)), random_int(0, $setting - 1)); } catch (Throwable $exception) { logger()->addError($exception, 'throwable'); } diff --git a/Kafka/Message.php b/Kafka/Message.php new file mode 100644 index 00000000..e57d1acb --- /dev/null +++ b/Kafka/Message.php @@ -0,0 +1,31 @@ +