This commit is contained in:
2021-08-23 11:08:40 +08:00
parent c1c0d862b9
commit 0e0376ce67
+4 -3
View File
@@ -29,11 +29,12 @@ class KafkaClient
/**
* Producer constructor.
* @param string $topic
* @param string $groupId
* @throws ConfigException
* @throws NotFindClassException
* @throws ReflectionException
* @throws ConfigException
*/
public function __construct(public string $topic)
public function __construct(public string $topic, public string $groupId)
{
$this->conf = di(Conf::class);
$this->topicConf = di(TopicConf::class);
@@ -51,7 +52,7 @@ class KafkaClient
throw new ConfigException('Please configure relevant information.');
}
$kafkaConfig->set('metadata.broker.list', $config['brokers']);
$kafkaConfig->set('group.id', $config['groupId']);
$kafkaConfig->set('group.id', $this->groupId);
$kafkaConfig->setErrorCb(function ($kafka, $err, $reason) {
logger()->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});