This commit is contained in:
2021-08-04 14:33:39 +08:00
parent e479529347
commit 864c62c345
3 changed files with 9 additions and 9 deletions
+3 -3
View File
@@ -45,11 +45,11 @@ class Producer extends Component
if (!class_exists(Conf::class)) {
return;
}
$this->conf = new Conf();
$this->topicConf = new TopicConf();
$this->conf = di(Conf::class);
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
$this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
$this->topicConf = di(TopicConf::class);
}
@@ -93,7 +93,7 @@ class Producer extends Component
*/
private function getProducer(): \RdKafka\Producer
{
return Snowflake::createObject(\RdKafka\Producer::class, [$this->conf]);
return Snowflake::getDi()->get(\RdKafka\Producer::class, [$this->conf]);
}
+1 -1
View File
@@ -20,7 +20,7 @@ class Struct
/**
* Struct constructor.
* @param $topic
* @param $message
* @param Message $message
*/
public function __construct($topic, Message $message)
{
+5 -5
View File
@@ -31,11 +31,11 @@ class TaskContainer extends BaseObject
}
/**
* @param $topic
* @return mixed|null
*/
public function getConsumer(string $topic)
/**
* @param string $topic
* @return mixed
*/
public function getConsumer(string $topic): mixed
{
return $this->_topics[$topic] ?? null;
}