From 46bcbb4f4b13cb13020fc53c41b4168437b1e446 Mon Sep 17 00:00:00 2001 From: "as2252258@163.com" Date: Tue, 20 Jul 2021 01:56:04 +0800 Subject: [PATCH] modify --- Kafka/Kafka.php | 327 +++++++++++++++++++-------------------- Kafka/KafkaProviders.php | 50 +++--- 2 files changed, 180 insertions(+), 197 deletions(-) diff --git a/Kafka/Kafka.php b/Kafka/Kafka.php index 7ba8c034..1d5071e5 100644 --- a/Kafka/Kafka.php +++ b/Kafka/Kafka.php @@ -10,6 +10,7 @@ use RdKafka\ConsumerTopic; use RdKafka\Exception; use RdKafka\KafkaConsumer; use RdKafka\TopicConf; +use Server\SInterface\CustomProcess; use Snowflake\Abstracts\Config; use Snowflake\Exception\ConfigException; use Snowflake\Runtime; @@ -23,201 +24,185 @@ use Throwable; * Class Queue * @package Queue */ -class Kafka extends \Snowflake\Process\Process +class Kafka implements CustomProcess { - protected Channel $channel; - - private int $maxLength = 5000; + protected Channel $channel; - /** - * @var array - */ - private array $kafkaConfig = []; + /** + * Kafka constructor. + * @param array $kafkaConfig + */ + public function __construct(public array $kafkaConfig) + { + + } - /** - * @return string - * @throws ConfigException - */ - public function getProcessName(): string - { - $this->kafkaConfig = swoole_unserialize($this->read()); + /** + * @return string + * @throws ConfigException + */ + public function getProcessName(Process $process): string + { + $name = Config::get('id', 'system') . '[' . $process->pid . ']'; - $name = Config::get('id', 'system') . '[' . $this->pid . ']'; - - return $name . '.' . 'Kafka Consumer ' . $this->kafkaConfig['topic']; - } + return $name . '.' . 'Kafka Consumer ' . $this->kafkaConfig['topic']; + } - /** - * @param Process $process - * @throws \Exception - */ - public function before(Process $process): void - { - $content = System::readFile(storage(Runtime::CACHE_NAME)); - - $annotation = Snowflake::app()->getAnnotation(); - $annotation->setLoader(unserialize($content)); - $annotation->runtime(KAFKA_PATH); - } + /** + * @param Process $process + * @throws \Exception + */ + public function onHandler(Process $process): void + { + $this->waite($process, $this->kafkaConfig); + } - /** - * @param Process $process - * @throws \Exception - */ - public function onHandler(Process $process): void - { - $this->waite($this->kafkaConfig); - } + /** + * @param array $kafkaServer + * @throws \Exception + */ + private function waite(Process $process, array $kafkaServer) + { + try { + [$config, $topic, $conf] = $this->kafkaConfig($kafkaServer); + if (empty($config) && empty($topic) && empty($conf)) { + return; + } + $objRdKafka = new Consumer($config); + $topic = $objRdKafka->newTopic($kafkaServer['topic'], $topic); + + $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); + do { + $this->resolve($topic, $conf['interval'] ?? 1000); + } while (true); + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + } + } - /** - * @param array $kafkaServer - * @throws \Exception - */ - private function waite(array $kafkaServer) - { - try { - name($this->pid, 'Kafka Consumer ' . $kafkaServer['topic']); - - [$config, $topic, $conf] = $this->kafkaConfig($kafkaServer); - if (empty($config) && empty($topic) && empty($conf)) { - return; - } - $objRdKafka = new Consumer($config); - $topic = $objRdKafka->newTopic($kafkaServer['topic'], $topic); - - $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); - do { - $this->resolve($topic, $conf['interval'] ?? 1000); - } while (true); - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - } + /** + * @param ConsumerTopic $topic + * @param $interval + * @throws \Exception + */ + private function resolve(ConsumerTopic $topic, $interval) + { + try { + $message = $topic->consume(0, $interval); + if (empty($message)) { + return; + } + if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { + $this->handlerExecute($message->topic_name, $message); + } else if ($message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { + logger()->warning('No more messages; will wait for more'); + } else if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) { + logger()->error('Kafka Timed out'); + } else { + logger()->error($message->errstr()); + } + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + } + } - /** - * @param ConsumerTopic $topic - * @param $interval - * @throws \Exception - */ - private function resolve(ConsumerTopic $topic, $interval) - { - try { - $message = $topic->consume(0, $interval); - if (empty($message)) { - return; - } - if ($message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { - $this->handlerExecute($message->topic_name, $message); - } else if ($message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) { - logger()->warning('No more messages; will wait for more'); - } else if ($message->err == RD_KAFKA_RESP_ERR__TIMED_OUT) { - logger()->error('Kafka Timed out'); - } else { - logger()->error($message->errstr()); - } - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - } + /** + * @param $topic + * @param $message + * @throws \Exception + */ + protected function handlerExecute($topic, $message) + { + go(function () use ($topic, $message) { + try { + $server = Snowflake::app()->getSwoole(); + + $setting = $server->setting['worker_num']; + + /** @var TaskContainer $container */ + $container = Snowflake::app()->get('kafka-container'); + $handler = $container->getConsumer($topic); + + if (empty($handler)) { + return; + } + + $message = swoole_serialize(['action' => 'kafka', 'handler' => $handler, 'body' => [$topic, $message]]); + + $server->sendMessage($message, random_int(0, $setting - 1)); + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + } + }); + } - /** - * @param $topic - * @param $message - * @throws \Exception - */ - protected function handlerExecute($topic, $message) - { - go(function () use ($topic, $message) { - try { - $server = Snowflake::app()->getSwoole(); + /** + * @param $kafka + * @return array + * @throws \Exception + */ + private function kafkaConfig($kafka): array + { + try { + $conf = new Conf(); + $conf->setRebalanceCb([$this, 'rebalanced_cb']); + $conf->set('group.id', $kafka['groupId']); + $conf->set('metadata.broker.list', $kafka['brokers']); + $conf->set('socket.timeout.ms', '30000'); - $setting = $server->setting['worker_num']; + debug('kafka listen groupId ' . $kafka['groupId']); + debug('kafka listen brokers ' . $kafka['brokers']); - /** @var TaskContainer $container */ - $container = Snowflake::app()->get('kafka-container'); - $handler = $container->getConsumer($topic); + if (function_exists('pcntl_sigprocmask')) { + pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); + $conf->set('internal.termination.signal', (string)SIGIO); + } - if (empty($handler)) { - return; - } + $topicConf = new TopicConf(); + $topicConf->set('auto.commit.enable', '1'); + $topicConf->set('auto.commit.interval.ms', '100'); - $message = swoole_serialize(['action' => 'kafka', 'handler' => $handler, 'body' => [$topic, $message]]); + //smallest:简单理解为从头开始消费, + //largest:简单理解为从最新的开始消费 + $topicConf->set('auto.offset.reset', 'smallest'); + $topicConf->set('offset.store.path', 'kafka_offset.log'); + $topicConf->set('offset.store.method', 'broker'); - $server->sendMessage($message, random_int(0, $setting - 1)); - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - } - }); - } + return [$conf, $topicConf, $kafka]; + } catch (Throwable $exception) { + logger()->addError($exception, 'throwable'); + + return [null, null, null]; + } + + } - /** - * @param $kafka - * @return array - * @throws \Exception - */ - private function kafkaConfig($kafka): array - { - try { - $conf = new Conf(); - $conf->setRebalanceCb([$this, 'rebalanced_cb']); - $conf->set('group.id', $kafka['groupId']); - $conf->set('metadata.broker.list', $kafka['brokers']); - $conf->set('socket.timeout.ms', '30000'); - - debug('kafka listen groupId ' . $kafka['groupId']); - debug('kafka listen brokers ' . $kafka['brokers']); - - if (function_exists('pcntl_sigprocmask')) { - pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); - $conf->set('internal.termination.signal', (string)SIGIO); - } - - $topicConf = new TopicConf(); - $topicConf->set('auto.commit.enable', '1'); - $topicConf->set('auto.commit.interval.ms', '100'); - - //smallest:简单理解为从头开始消费, - //largest:简单理解为从最新的开始消费 - $topicConf->set('auto.offset.reset', 'smallest'); - $topicConf->set('offset.store.path', 'kafka_offset.log'); - $topicConf->set('offset.store.method', 'broker'); - - return [$conf, $topicConf, $kafka]; - } catch (Throwable $exception) { - logger()->addError($exception, 'throwable'); - - return [null, null, null]; - } - - } - - - /** - * @param KafkaConsumer $kafka - * @param $err - * @param array|null $partitions - * @throws Exception - * @throws \Exception - */ - public function rebalanced_cb(KafkaConsumer $kafka, $err, array $partitions = null) - { - if ($err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { - $kafka->assign($partitions); - } else if ($err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { - $kafka->assign(NULL); - } else { - throw new \Exception($err); - } - } + /** + * @param KafkaConsumer $kafka + * @param $err + * @param array|null $partitions + * @throws Exception + * @throws \Exception + */ + public function rebalanced_cb(KafkaConsumer $kafka, $err, array $partitions = null) + { + if ($err == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { + $kafka->assign($partitions); + } else if ($err == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { + $kafka->assign(NULL); + } else { + throw new \Exception($err); + } + } } diff --git a/Kafka/KafkaProviders.php b/Kafka/KafkaProviders.php index 44a73cf7..b5335ced 100644 --- a/Kafka/KafkaProviders.php +++ b/Kafka/KafkaProviders.php @@ -19,31 +19,29 @@ use Snowflake\Application; class KafkaProviders extends Providers { - /** - * @param Application $application - * @throws Exception - */ - public function onImport(Application $application) - { - /** @var Server $server */ - $server = $application->get('server'); - $application->set('kafka', ['class' => Producer::class]); - $kafka = SConfig::get('kafka'); - if (empty($kafka) || !($kafka['enable'] ?? false)) { - return; - } - if (!extension_loaded('rdkafka')) { - return; - } - - $kafkaServers = Config::get('kafka.consumers', []); - if (empty($kafkaServers)) { - return; - } - - foreach ($kafkaServers as $index => $kafkaServer) { - $server->addProcess('kafka_' . $index, Kafka::class, $kafkaServer); - } - } + /** + * @param Application $application + * @throws Exception + */ + public function onImport(Application $application) + { + /** @var Server $server */ + $server = $application->get('server'); + $application->set('kafka', ['class' => Producer::class]); + $kafka = SConfig::get('kafka'); + if (empty($kafka) || !($kafka['enable'] ?? false)) { + return; + } + if (!extension_loaded('rdkafka')) { + return; + } + $kafkaServers = Config::get('kafka.consumers', []); + if (empty($kafkaServers)) { + return; + } + foreach ($kafkaServers as $kafkaServer) { + $server->addProcess(new Kafka($kafkaServer)); + } + } }