From c17a1cfcc69afaee278ee8bca3609fa0db2876f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mr=C2=B7x?= Date: Thu, 5 Aug 2021 15:56:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Kafka/Kafka.php | 281 ++++++++++++++++++++++++------------------------ 1 file changed, 141 insertions(+), 140 deletions(-) diff --git a/Kafka/Kafka.php b/Kafka/Kafka.php index 9149fc79..5917bc36 100644 --- a/Kafka/Kafka.php +++ b/Kafka/Kafka.php @@ -10,13 +10,13 @@ use RdKafka\ConsumerTopic; use RdKafka\Exception; use RdKafka\KafkaConsumer; use RdKafka\TopicConf; +use ReflectionException; use Server\SInterface\CustomProcess; use Snowflake\Abstracts\Config; use Snowflake\Exception\ConfigException; -use Snowflake\Runtime; +use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; use Swoole\Coroutine\Channel; -use Swoole\Coroutine\System; use Swoole\Process; use Throwable; @@ -27,17 +27,18 @@ use Throwable; class Kafka implements CustomProcess { - protected Channel $channel; + protected Channel $channel; - /** - * Kafka constructor. - * @param array $kafkaConfig - */ - public function __construct(public array $kafkaConfig) - { - scan_directory(); - } + /** + * @param array $kafkaConfig + * @throws ReflectionException + * @throws NotFindClassException + */ + public function __construct(public array $kafkaConfig) + { + scan_directory(directory('app'), 'App', [CONTROLLER_PATH]); + } /** @@ -45,22 +46,22 @@ class Kafka implements CustomProcess * @return string * @throws ConfigException */ - public function getProcessName(Process $process): string - { - $name = Config::get('id', 'system') . '[' . $process->pid . ']'; + public function getProcessName(Process $process): string + { + $name = Config::get('id', 'system') . '[' . $process->pid . ']'; - return $name . '.' . 'Kafka Consumer ' . $this->kafkaConfig['topic']; - } + return $name . '.' . 'Kafka Consumer ' . $this->kafkaConfig['topic']; + } - /** - * @param Process $process - * @throws \Exception - */ - public function onHandler(Process $process): void - { - $this->waite($process, $this->kafkaConfig); - } + /** + * @param Process $process + * @throws \Exception + */ + public function onHandler(Process $process): void + { + $this->waite($process, $this->kafkaConfig); + } /** @@ -68,141 +69,141 @@ class Kafka implements CustomProcess * @param array $kafkaServer * @throws \Exception */ - private function waite(Process $process, array $kafkaServer) - { - try { - [$config, $topic, $conf] = $this->kafkaConfig($kafkaServer); - if (empty($config) && empty($topic) && empty($conf)) { - return; - } - $objRdKafka = new Consumer($config); - $topic = $objRdKafka->newTopic($kafkaServer['topic'], $topic); + private function waite(Process $process, array $kafkaServer) + { + try { + [$config, $topic, $conf] = $this->kafkaConfig($kafkaServer); + if (empty($config) && empty($topic) && empty($conf)) { + return; + } + $objRdKafka = new Consumer($config); + $topic = $objRdKafka->newTopic($kafkaServer['topic'], $topic); - $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); - do { - $this->resolve($topic, $conf['interval'] ?? 1000); - } while (true); - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - } + $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); + do { + $this->resolve($topic, $conf['interval'] ?? 1000); + } while (true); + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + } + } - /** - * @param ConsumerTopic $topic - * @param $interval - * @throws \Exception - */ - private function resolve(ConsumerTopic $topic, $interval) - { - try { - $message = $topic->consume(0, $interval); - if (empty($message)) { - return; - } - if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { - $this->handlerExecute($message->topic_name, $message); - } else if ($message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - logger()->warning('No more messages; will wait for more'); - } else if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) { - logger()->error('Kafka Timed out'); - } else { - logger()->error($message->errstr()); - } - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - } + /** + * @param ConsumerTopic $topic + * @param $interval + * @throws \Exception + */ + private function resolve(ConsumerTopic $topic, $interval) + { + try { + $message = $topic->consume(0, $interval); + if (empty($message)) { + return; + } + if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { + $this->handlerExecute($message->topic_name, $message); + } else if ($message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + logger()->warning('No more messages; will wait for more'); + } else if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) { + logger()->error('Kafka Timed out'); + } else { + logger()->error($message->errstr()); + } + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + } + } - /** - * @param $topic - * @param $message - * @throws \Exception - */ - protected function handlerExecute($topic, $message) - { - go(function () use ($topic, $message) { - try { - $server = Snowflake::app()->getSwoole(); + /** + * @param $topic + * @param $message + * @throws \Exception + */ + protected function handlerExecute($topic, $message) + { + go(function () use ($topic, $message) { + try { + $server = Snowflake::app()->getSwoole(); - $setting = $server->setting['worker_num']; + $setting = $server->setting['worker_num']; - /** @var KafkaProvider $container */ - $container = Snowflake::app()->get('kafka-container'); - $handler = $container->getConsumer($topic); + /** @var KafkaProvider $container */ + $container = Snowflake::app()->get('kafka-container'); + $handler = $container->getConsumer($topic); - if (empty($handler)) { - return; - } + if (empty($handler)) { + return; + } - $message = swoole_serialize(['action' => 'kafka', 'handler' => $handler, 'body' => [$topic, $message]]); + $message = swoole_serialize(['action' => 'kafka', 'handler' => $handler, 'body' => [$topic, $message]]); - $server->sendMessage($message, random_int(0, $setting - 1)); - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - }); - } + $server->sendMessage($message, random_int(0, $setting - 1)); + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + } + }); + } - /** - * @param $kafka - * @return array - * @throws \Exception - */ - private function kafkaConfig($kafka): array - { - try { - $conf = new Conf(); - $conf->setRebalanceCb([$this, 'rebalanced_cb']); - $conf->set('group.id', $kafka['groupId']); - $conf->set('metadata.broker.list', $kafka['brokers']); - $conf->set('socket.timeout.ms', '30000'); + /** + * @param $kafka + * @return array + * @throws \Exception + */ + private function kafkaConfig($kafka): array + { + try { + $conf = new Conf(); + $conf->setRebalanceCb([$this, 'rebalanced_cb']); + $conf->set('group.id', $kafka['groupId']); + $conf->set('metadata.broker.list', $kafka['brokers']); + $conf->set('socket.timeout.ms', '30000'); - debug('kafka listen groupId ' . $kafka['groupId']); - debug('kafka listen brokers ' . $kafka['brokers']); + debug('kafka listen groupId ' . $kafka['groupId']); + debug('kafka listen brokers ' . $kafka['brokers']); - if (function_exists('pcntl_sigprocmask')) { - pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); - $conf->set('internal.termination.signal', (string)SIGIO); - } + if (function_exists('pcntl_sigprocmask')) { + pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); + $conf->set('internal.termination.signal', (string)SIGIO); + } - $topicConf = new TopicConf(); - $topicConf->set('auto.commit.enable', '1'); - $topicConf->set('auto.commit.interval.ms', '100'); + $topicConf = new TopicConf(); + $topicConf->set('auto.commit.enable', '1'); + $topicConf->set('auto.commit.interval.ms', '100'); - //smallest:简单理解为从头开始消费, - //largest:简单理解为从最新的开始消费 - $topicConf->set('auto.offset.reset', 'smallest'); - $topicConf->set('offset.store.path', 'kafka_offset.log'); - $topicConf->set('offset.store.method', 'broker'); + //smallest:简单理解为从头开始消费, + //largest:简单理解为从最新的开始消费 + $topicConf->set('auto.offset.reset', 'smallest'); + $topicConf->set('offset.store.path', 'kafka_offset.log'); + $topicConf->set('offset.store.method', 'broker'); - return [$conf, $topicConf, $kafka]; - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - return [null, null, null]; - } - } + return [$conf, $topicConf, $kafka]; + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + return [null, null, null]; + } + } - /** - * @param KafkaConsumer $kafka - * @param $err - * @param array|null $partitions - * @throws Exception - * @throws \Exception - */ - public function rebalanced_cb(KafkaConsumer $kafka, $err, array $partitions = null) - { - if ($err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { - $kafka->assign($partitions); - } else if ($err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { - $kafka->assign(NULL); - } else { - throw new \Exception($err); - } - } + /** + * @param KafkaConsumer $kafka + * @param $err + * @param array|null $partitions + * @throws Exception + * @throws \Exception + */ + public function rebalanced_cb(KafkaConsumer $kafka, $err, array $partitions = null) + { + if ($err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { + $kafka->assign($partitions); + } else if ($err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { + $kafka->assign(NULL); + } else { + throw new \Exception($err); + } + } }