From 73c148e1a18e9ae22f181073f26c21eef9df5fa9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Thu, 5 Aug 2021 17:41:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/Message.php | 37 ---------- Kafka/Producer.php | 149 ---------------------------------------- Kafka/Struct.php | 8 +-- System/Di/Container.php | 7 +- 4 files changed, 6 insertions(+), 195 deletions(-) delete mode 100644 Kafka/Message.php delete mode 100644 Kafka/Producer.php diff --git a/Kafka/Message.php b/Kafka/Message.php deleted file mode 100644 index 81e8e2a0..00000000 --- a/Kafka/Message.php +++ /dev/null @@ -1,37 +0,0 @@ -get(KafkaProvider::class); - $data = $container->getConsumer($this->struct->topic); - var_dump($data); - } - -} diff --git a/Kafka/Producer.php b/Kafka/Producer.php deleted file mode 100644 index fb541829..00000000 --- a/Kafka/Producer.php +++ /dev/null @@ -1,149 +0,0 @@ -conf = di(Conf::class); - $this->conf->set('metadata.broker.list', $brokers); - $this->conf->set('group.id', $groupId); - $this->conf->setErrorCb(function ($kafka, $err, $reason) { - $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); - }); - $this->topicConf = di(TopicConf::class); - } - - /** - * @param array $params - * @param string|null $groupId - * @throws Exception - */ - public function dispatch(array $params = [], string $groupId = null) - { - $this->sendMessage([$params]); - } - - /** - * @return \RdKafka\Producer - * @throws Exception - */ - private function getProducer(): \RdKafka\Producer - { - return Snowflake::getDi()->get(\RdKafka\Producer::class, [$this->conf]); - } - - - /** - * @param \RdKafka\Producer $producer - * @param $topic - * @return ProducerTopic - * @throws Exception - */ - private function getProducerTopic(\RdKafka\Producer $producer, $topic): ProducerTopic - { - return $producer->newTopic($topic, $this->topicConf); - } - - - /** - * @param string|null $key - * @param array $data - * @param string|null $groupId - * @throws ConfigException - * @throws Exception - */ - public function batch(?string $key, array $data, ?string $groupId = null) - { - $this->sendMessage($data, $key); - } - - - /** - * @param array $message - * @param string $key - * @throws Exception - */ - private function sendMessage(array $message, string $key = '') - { - $producer = $this->getProducer(); - $producerTopic = $this->getProducerTopic($producer, $this->topic); - if ($this->isAck) { - $this->flush($producer); - } - foreach ($message as $value) { - $producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key); - $producer->poll(0); - } - $this->flush($producer); - } - - - /** - * @param bool $ack - */ - public function setAsk(bool $ack) - { - $this->isAck = $ack; - $this->topicConf->set('request.required.acks', $this->isAck ? '1' : '0'); - } - - - /** - * @param \RdKafka\Producer $producer - */ - public function flush(\RdKafka\Producer $producer) - { - while ($producer->getOutQLen() > 0) { - $result = $producer->flush(100); - if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { - break; - } - } - } - -} diff --git a/Kafka/Struct.php b/Kafka/Struct.php index d47efbdc..7f447899 100644 --- a/Kafka/Struct.php +++ b/Kafka/Struct.php @@ -9,13 +9,13 @@ use RdKafka\Message; class Struct { - public int $offset; + public ?int $offset; - public Message $message; - public string $topic; + public ?Message $message; + public ?string $topic; public mixed $value; - public int $part; + public ?int $part; /** * Struct constructor. diff --git a/System/Di/Container.php b/System/Di/Container.php index fc6e8452..55a6e08f 100644 --- a/System/Di/Container.php +++ b/System/Di/Container.php @@ -135,11 +135,8 @@ class Container extends BaseObject if ($construct->getNumberOfParameters() < 1) { return $reflect->newInstance(); } - if (!empty($dependencies)) { - $parameters = $this->mergeParam($this->resolveMethodParameters($construct), $dependencies); - return $reflect->newInstanceArgs($parameters); - } - return $reflect->newInstanceWithoutConstructor(); + $parameters = $this->mergeParam($this->resolveMethodParameters($construct), $dependencies); + return $reflect->newInstanceArgs($parameters); }