This commit is contained in:
2021-08-05 15:21:44 +08:00
parent ba00de55cf
commit cbab515ad7
+26 -14
View File
@@ -6,7 +6,9 @@ use Exception;
use RdKafka\Conf;
use RdKafka\ProducerTopic;
use RdKafka\TopicConf;
use Snowflake\Abstracts\BaseObject;
use ReflectionException;
use Snowflake\Abstracts\Config;
use Snowflake\Exception\ConfigException;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
@@ -14,37 +16,47 @@ use Snowflake\Snowflake;
/**
*
*/
class KafkaClient extends BaseObject
class KafkaClient
{
private Conf $conf;
private TopicConf $topicConf;
private ?\RdKafka\Producer $producer = null;
private bool $isAck = true;
/**
* Producer constructor.
* @param string $topic
* @param string $groupId
* @param string $brokers
* @throws NotFindClassException
* @throws Exception
* @throws ReflectionException
* @throws ConfigException
*/
public function __construct(public string $topic, string $groupId, string $brokers)
public function __construct(public string $topic)
{
parent::__construct([]);
$this->conf = di(Conf::class);
$this->conf->set('metadata.broker.list', $brokers);
$this->conf->set('group.id', $groupId);
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
$this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
$this->topicConf = di(TopicConf::class);
$this->setConfig($this->conf);
}
/**
* @throws ConfigException
*/
private function setConfig(Conf $kafkaConfig)
{
$config = Config::get('producers.' . $this->topic, null, true);
if (!isset($config['brokers']) || !isset($config['groupId'])) {
throw new ConfigException('Please configure relevant information.');
}
$kafkaConfig->set('metadata.broker.list', $config['brokers']);
$kafkaConfig->set('group.id', $config['groupId']);
$kafkaConfig->setErrorCb(function ($kafka, $err, $reason) {
logger()->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
}
/**
* @param array $params
* @param bool $isAck