This commit is contained in:
2021-08-12 16:33:03 +08:00
parent b94e3f6629
commit 223f67065a
+8 -7
View File
@@ -4,6 +4,7 @@ namespace Kafka;
use Exception;
use RdKafka\Conf;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
use RdKafka\TopicConf;
use ReflectionException;
@@ -82,22 +83,22 @@ class KafkaClient
/**
* @return \RdKafka\Producer
* @return Producer
* @throws Exception
*/
private function getProducer(): \RdKafka\Producer
private function getProducer(): Producer
{
return Kiri::getDi()->get(\RdKafka\Producer::class, [$this->conf]);
return Kiri::getDi()->get(Producer::class, [$this->conf]);
}
/**
* @param \RdKafka\Producer $producer
* @param Producer $producer
* @param $topic
* @param $isAck
* @return ProducerTopic
*/
private function getProducerTopic(\RdKafka\Producer $producer, $topic, $isAck): ProducerTopic
private function getProducerTopic(Producer $producer, $topic, $isAck): ProducerTopic
{
$this->topicConf->set('request.required.acks', $isAck ? '1' : '0');
return $producer->newTopic($topic, $this->topicConf);
@@ -126,9 +127,9 @@ class KafkaClient
/**
* @param \RdKafka\Producer $producer
* @param Producer $producer
*/
private function flush(\RdKafka\Producer $producer)
private function flush(Producer $producer)
{
while ($producer->getOutQLen() > 0) {
$result = $producer->flush(100);