From aae349efddd8251f31b70090cef004f8ece3791d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Wed, 28 Oct 2020 16:01:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/Producer.php | 66 +++++++++++++++++++++++----- System/Abstracts/BaseApplication.php | 4 +- 2 files changed, 57 insertions(+), 13 deletions(-) diff --git a/Kafka/Producer.php b/Kafka/Producer.php index 113e2cd6..845790e8 100644 --- a/Kafka/Producer.php +++ b/Kafka/Producer.php @@ -16,7 +16,10 @@ namespace Kafka; use RdKafka\Conf; use RdKafka\TopicConf; +use ReflectionException; use Snowflake\Abstracts\Component; +use Snowflake\Exception\NotFindClassException; +use Snowflake\Snowflake; /** * +------------------------------------------------------------------------------ @@ -32,17 +35,59 @@ use Snowflake\Abstracts\Component; class Producer extends Component { + private string $_topic = ''; + + /** + * @param $servers + * @return Producer + * @throws NotFindClassException + * @throws ReflectionException + */ + public function setBrokers(string $servers) + { + /** @var Conf $conf */ + $conf = Snowflake::createObject(Conf::class); + $conf->set('metadata.broker.list', $servers); + return $this; + } + + + /** + * @param bool $value + * @return $this + * @throws NotFindClassException + * @throws ReflectionException + */ + public function setAck(bool $value) + { + /** @var TopicConf $conf */ + $conf = Snowflake::createObject(TopicConf::class); + $conf->set('request.required.acks', (int)$value); + + return $this; + } + + + /** + * @param $servers + * @return Producer + */ + public function setTopic(string $servers) + { + $this->_topic = $servers; + return $this; + } + + /** - * @param $topic - * @param $brokers * @param $message * @param null $key * @param int $timeout + * @throws */ - public function delivery($topic, $brokers, $message, $key = null, $timeout = 5) + public function delivery($message, $key = null, $timeout = 5) { - $conf = new Conf(); - $conf->set('metadata.broker.list', $brokers); + $conf = Snowflake::createObject(Conf::class); $conf->setDrmSgCb(function ($kafka, $message) { // $this->debug(var_export($message, true)); }); @@ -50,13 +95,12 @@ class Producer extends Component $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); }); - $cf = new TopicConf(); - $cf->set('request.required.acks', 1); - - $rk = new \RdKafka\Producer($conf); - $topic = $rk->newTopic($topic, $cf); + $rk = new \RdKafka\Producer(); + $topic = $rk->newTopic($this->_topic, Snowflake::createObject(TopicConf::class)); $topic->produce(RD_KAFKA_PARTITION_UA, 0, $message, $key); - $rk->poll($timeout); + if ($rk->getOutQLen() > 0) { + $rk->poll($timeout); + } $rk->flush($timeout); } } diff --git a/System/Abstracts/BaseApplication.php b/System/Abstracts/BaseApplication.php index d2261054..46e0161f 100644 --- a/System/Abstracts/BaseApplication.php +++ b/System/Abstracts/BaseApplication.php @@ -58,9 +58,9 @@ abstract class BaseApplication extends Service /** * @var string */ - public $storage = APP_PATH . '/storage'; + public string $storage = APP_PATH . '/storage'; - public $envPath = APP_PATH . '/.env'; + public string $envPath = APP_PATH . '/.env'; /** * Init constructor.