diff --git a/Kafka/KafkaClient.php b/Kafka/KafkaClient.php index 82502243..0302b289 100644 --- a/Kafka/KafkaClient.php +++ b/Kafka/KafkaClient.php @@ -6,7 +6,9 @@ use Exception; use RdKafka\Conf; use RdKafka\ProducerTopic; use RdKafka\TopicConf; -use Snowflake\Abstracts\BaseObject; +use ReflectionException; +use Snowflake\Abstracts\Config; +use Snowflake\Exception\ConfigException; use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; @@ -14,37 +16,47 @@ use Snowflake\Snowflake; /** * */ -class KafkaClient extends BaseObject +class KafkaClient { private Conf $conf; private TopicConf $topicConf; - private ?\RdKafka\Producer $producer = null; - private bool $isAck = true; /** * Producer constructor. * @param string $topic - * @param string $groupId - * @param string $brokers * @throws NotFindClassException - * @throws Exception + * @throws ReflectionException + * @throws ConfigException */ - public function __construct(public string $topic, string $groupId, string $brokers) + public function __construct(public string $topic) { - parent::__construct([]); $this->conf = di(Conf::class); - $this->conf->set('metadata.broker.list', $brokers); - $this->conf->set('group.id', $groupId); - $this->conf->setErrorCb(function ($kafka, $err, $reason) { - $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); - }); $this->topicConf = di(TopicConf::class); + $this->setConfig($this->conf); } + + /** + * @throws ConfigException + */ + private function setConfig(Conf $kafkaConfig) + { + $config = Config::get('producers.' . $this->topic, null, true); + if (!isset($config['brokers']) || !isset($config['groupId'])) { + throw new ConfigException('Please configure relevant information.'); + } + $kafkaConfig->set('metadata.broker.list', $config['brokers']); + $kafkaConfig->set('group.id', $config['groupId']); + $kafkaConfig->setErrorCb(function ($kafka, $err, $reason) { + logger()->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); + }); + } + + /** * @param array $params * @param bool $isAck