diff --git a/HttpServer/Client/Client.php b/HttpServer/Client/Help/Client.php similarity index 93% rename from HttpServer/Client/Client.php rename to HttpServer/Client/Help/Client.php index 10a740ab..51bed866 100644 --- a/HttpServer/Client/Client.php +++ b/HttpServer/Client/Help/Client.php @@ -7,7 +7,7 @@ */ declare(strict_types=1); -namespace HttpServer\Client; +namespace HttpServer\Client\Help; use Exception; use JetBrains\PhpStorm\Pure; @@ -27,7 +27,7 @@ class Client extends ClientAbstracts * @return array|string|Result * @throws Exception */ - public function request(string $method, $path, $params = []): array|string|Result + public function request(string $method, $path, array $params = []): array|string|Result { return $this->setMethod($method) ->coroutine( @@ -44,7 +44,7 @@ class Client extends ClientAbstracts * @throws Exception * 使用swoole协程方式请求 */ - private function coroutine($url, $data = []): array|string|Result + private function coroutine($url, array $data = []): array|string|Result { try { $client = $this->generate_client($data, ...$url); @@ -99,7 +99,7 @@ class Client extends ClientAbstracts /** - * @param $client + * @param SClient $client * @param $path * @param $data * @return string diff --git a/HttpServer/Client/ClientAbstracts.php b/HttpServer/Client/Help/ClientAbstracts.php similarity index 99% rename from HttpServer/Client/ClientAbstracts.php rename to HttpServer/Client/Help/ClientAbstracts.php index fc6c5701..7ef25b2a 100644 --- a/HttpServer/Client/ClientAbstracts.php +++ b/HttpServer/Client/Help/ClientAbstracts.php @@ -1,7 +1,7 @@ joinGetParams($path, $params); diff --git a/HttpServer/Client/IClient.php b/HttpServer/Client/Help/IClient.php similarity index 99% rename from HttpServer/Client/IClient.php rename to HttpServer/Client/Help/IClient.php index 80d4d8ed..e6c34f91 100644 --- a/HttpServer/Client/IClient.php +++ b/HttpServer/Client/Help/IClient.php @@ -1,7 +1,7 @@ getClient($domain, $ssl, $timeout); $client->send($request); - if (Context::getContext('http2isRecv') === false) { + if (Context::getContext('http2isRev') === false) { return null; } - return $this->recv($client); + return $this->rev($client); } @@ -216,7 +219,7 @@ class Http2 extends Component * @return mixed * @throws Exception */ - private function recv($client): mixed + private function rev($client): mixed { /** @var Response $response */ if (!Context::hasContext('http2timeout')) { @@ -246,7 +249,7 @@ class Http2 extends Component * @return mixed * @throws Exception */ - public function put($domain, $path, $params = [], $timeout = -1): Result + public function put($domain, $path, array $params = [], int $timeout = -1): Result { $request = $this->dispatch($domain, $path, 'PUT', $params, $timeout); diff --git a/HttpServer/Client/HttpClient.php b/HttpServer/Client/HttpClient.php index 4b2bbdd1..4c46db35 100644 --- a/HttpServer/Client/HttpClient.php +++ b/HttpServer/Client/HttpClient.php @@ -5,11 +5,12 @@ namespace HttpServer\Client; use Exception; -use JetBrains\PhpStorm\Pure; use Snowflake\Abstracts\Component; use Snowflake\Snowflake; use Swoole\Coroutine; - +use HttpServer\Client\Help\IClient; +use HttpServer\Client\Help\Client; +use HttpServer\Client\Help\Curl; /** * Class ClientDriver diff --git a/Kafka/KafkaClient.php b/Kafka/KafkaClient.php new file mode 100644 index 00000000..82502243 --- /dev/null +++ b/Kafka/KafkaClient.php @@ -0,0 +1,129 @@ +conf = di(Conf::class); + $this->conf->set('metadata.broker.list', $brokers); + $this->conf->set('group.id', $groupId); + $this->conf->setErrorCb(function ($kafka, $err, $reason) { + $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); + }); + $this->topicConf = di(TopicConf::class); + } + + /** + * @param array $params + * @param bool $isAck + * @throws Exception + */ + public function dispatch(array $params = [], bool $isAck = false) + { + $this->sendMessage([$params], $isAck); + } + + + /** + * @param string|null $key + * @param array $data + * @param bool $isAck + * @throws Exception + */ + public function batch(?string $key, array $data, bool $isAck = false) + { + $this->sendMessage($data, $key, $isAck); + } + + + /** + * @return \RdKafka\Producer + * @throws Exception + */ + private function getProducer(): \RdKafka\Producer + { + return Snowflake::getDi()->get(\RdKafka\Producer::class, [$this->conf]); + } + + + /** + * @param \RdKafka\Producer $producer + * @param $topic + * @param $isAck + * @return ProducerTopic + */ + private function getProducerTopic(\RdKafka\Producer $producer, $topic, $isAck): ProducerTopic + { + $this->topicConf->set('request.required.acks', $isAck ? '1' : '0'); + return $producer->newTopic($topic, $this->topicConf); + } + + + /** + * @param array $message + * @param string $key + * @param bool $isAck + * @throws Exception + */ + private function sendMessage(array $message, string $key = '', bool $isAck = false) + { + $producer = $this->getProducer(); + $producerTopic = $this->getProducerTopic($producer, $this->topic, $isAck); + if ($this->isAck) { + $this->flush($producer); + } + foreach ($message as $value) { + $producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key); + $producer->poll(0); + } + $this->flush($producer); + } + + + /** + * @param \RdKafka\Producer $producer + */ + private function flush(\RdKafka\Producer $producer) + { + while ($producer->getOutQLen() > 0) { + $result = $producer->flush(100); + if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { + break; + } + } + } + + +} diff --git a/Kafka/KafkaImports.php b/Kafka/KafkaImports.php index ea6e46d6..eaaa89e7 100644 --- a/Kafka/KafkaImports.php +++ b/Kafka/KafkaImports.php @@ -19,29 +19,28 @@ use Snowflake\Application; class KafkaImports extends Providers { - /** - * @param Application $application - * @throws Exception - */ - public function onImport(Application $application) - { - /** @var Server $server */ - $server = $application->get('server'); - $application->set('kafka', ['class' => Producer::class]); - $kafka = SConfig::get('kafka'); - if (empty($kafka) || !($kafka['enable'] ?? false)) { - return; - } - if (!extension_loaded('rdkafka')) { - return; - } - $kafkaServers = Config::get('kafka.consumers', []); - if (empty($kafkaServers)) { - return; - } - foreach ($kafkaServers as $kafkaServer) { - $server->addProcess(new Kafka($kafkaServer)); - } - } + /** + * @param Application $application + * @throws Exception + */ + public function onImport(Application $application) + { + if (!extension_loaded('rdkafka')) { + return; + } + $kafka = SConfig::get('kafka', ['enable' => false]); + if (($kafka['enable'] ?? false) == false) { + return; + } + $kafkaServers = Config::get('kafka.consumers', []); + if (empty($kafkaServers)) { + return; + } + /** @var Server $server */ + $server = $application->get('server'); + foreach ($kafkaServers as $kafkaServer) { + $server->addProcess(new Kafka($kafkaServer)); + } + } } diff --git a/Kafka/Producer.php b/Kafka/Producer.php index 606b02af..fb541829 100644 --- a/Kafka/Producer.php +++ b/Kafka/Producer.php @@ -7,9 +7,12 @@ use Exception; use RdKafka\Conf; use RdKafka\ProducerTopic; use RdKafka\TopicConf; +use ReflectionException; +use Snowflake\Abstracts\BaseObject; use Snowflake\Abstracts\Component; use Snowflake\Abstracts\Config; use Snowflake\Exception\ConfigException; +use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; /** @@ -23,7 +26,7 @@ use Snowflake\Snowflake; * @author $_SWANBR_AUTHOR_$ * +------------------------------------------------------------------------------ */ -class Producer extends Component +class Producer extends BaseObject { @@ -36,55 +39,33 @@ class Producer extends Component /** * Producer constructor. - * @param array $config + * @param string $topic + * @param string $groupId + * @param string $brokers + * @throws ReflectionException + * @throws NotFindClassException * @throws Exception */ - public function __construct(array $config = []) + public function __construct(public string $topic, string $groupId, string $brokers) { - parent::__construct($config); - if (!class_exists(Conf::class)) { - return; - } + parent::__construct([]); $this->conf = di(Conf::class); + $this->conf->set('metadata.broker.list', $brokers); + $this->conf->set('group.id', $groupId); $this->conf->setErrorCb(function ($kafka, $err, $reason) { $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); }); $this->topicConf = di(TopicConf::class); } - /** - * @param string $servers - * @return Producer - */ - public function setBrokers(string $servers): static - { - $this->conf->set('metadata.broker.list', $servers); - return $this; - } - - - /** - * @param string $groupId - * @return Producer - */ - public function setGroupId(string $groupId): static - { - $this->conf->set('group.id', $groupId); - return $this; - } - - - /** - * @param string $topic * @param array $params * @param string|null $groupId * @throws Exception */ - public function dispatch(string $topic, array $params = [], string $groupId = null) + public function dispatch(array $params = [], string $groupId = null) { - $this->beforePushMessage($topic, $groupId); - $this->sendMessage($topic, [$params]); + $this->sendMessage([$params]); } /** @@ -110,56 +91,27 @@ class Producer extends Component /** - * @param string $toPic * @param string|null $key * @param array $data * @param string|null $groupId * @throws ConfigException * @throws Exception */ - public function batch(string $toPic, ?string $key, array $data, ?string $groupId = null) + public function batch(?string $key, array $data, ?string $groupId = null) { - $this->beforePushMessage($toPic, $groupId); - - $this->sendMessage($toPic, $data, $key); + $this->sendMessage($data, $key); } /** - * @param $topic - * @param $groupId - * @throws ConfigException - * @throws Exception - */ - private function beforePushMessage($topic, $groupId): void - { - $consumers = Config::get('kafka.producers.' . $topic); - if (empty($consumers) || !is_array($consumers)) { - throw new Exception('You need set kafka.producers config'); - } - if (!isset($consumers['brokers'])) { - throw new Exception('You need set brokers config.'); - } - if (!empty($groupId)) { - $consumers['groupId'] = $groupId; - } else if (!isset($consumers['groupId'])) { - $consumers['groupId'] = $topic . ':' . Snowflake::localhost(); - } - $this->setGroupId($consumers['groupId']); - $this->setBrokers($consumers['brokers']); - } - - - /** - * @param string $topic * @param array $message * @param string $key * @throws Exception */ - private function sendMessage(string $topic, array $message, string $key = '') + private function sendMessage(array $message, string $key = '') { $producer = $this->getProducer(); - $producerTopic = $this->getProducerTopic($producer, $topic); + $producerTopic = $this->getProducerTopic($producer, $this->topic); if ($this->isAck) { $this->flush($producer); } diff --git a/System/Abstracts/BaseApplication.php b/System/Abstracts/BaseApplication.php index 70e3f362..a40268bc 100644 --- a/System/Abstracts/BaseApplication.php +++ b/System/Abstracts/BaseApplication.php @@ -22,7 +22,6 @@ use HttpServer\Route\Router; use HttpServer\Server; use HttpServer\Shutdown; use JetBrains\PhpStorm\Pure; -use Kafka\Producer; use Kafka\KafkaProvider; use ReflectionException; use Server\ServerManager; @@ -167,22 +166,22 @@ abstract class BaseApplication extends Component } - /** - * @param $name - * @return mixed - * @throws \ReflectionException - * @throws \Snowflake\Exception\NotFindClassException - */ + /** + * @param $name + * @return mixed + * @throws \ReflectionException + * @throws \Snowflake\Exception\NotFindClassException + */ public function __get($name): mixed - { - if ($this->has($name)) { - return $this->get($name); - } - return parent::__get($name); // TODO: Change the autogenerated stub - } + { + if ($this->has($name)) { + return $this->get($name); + } + return parent::__get($name); // TODO: Change the autogenerated stub + } - /** + /** * @param $config * * @throws @@ -262,17 +261,16 @@ abstract class BaseApplication extends Component } - /** - * @param $name - * @return mixed - * @throws \ReflectionException - * @throws \Snowflake\Exception\NotFindClassException - */ + /** + * @param $name + * @return mixed + * @throws \ReflectionException + * @throws \Snowflake\Exception\NotFindClassException + */ public function get($name): mixed - { - return di(Service::class)->get($name); - } - + { + return di(Service::class)->get($name); + } /** @@ -302,16 +300,6 @@ abstract class BaseApplication extends Component } - /** - * @return Producer - * @throws Exception - */ - public function getKafka(): Producer - { - return $this->get('kafka'); - } - - /** * @return \Redis|Redis * @throws Exception @@ -461,41 +449,39 @@ abstract class BaseApplication extends Component } - - /** - * @param $array - * @throws \ReflectionException - * @throws \Snowflake\Exception\NotFindClassException - */ + /** + * @param $array + * @throws \ReflectionException + * @throws \Snowflake\Exception\NotFindClassException + */ private function setComponents($array): void - { - di(Service::class)->setComponents($array); - } + { + di(Service::class)->setComponents($array); + } - /** - * @param $id - * @param $definition - * @throws \ReflectionException - * @throws \Snowflake\Exception\NotFindClassException - */ + /** + * @param $id + * @param $definition + * @throws \ReflectionException + * @throws \Snowflake\Exception\NotFindClassException + */ public function set($id, $definition): void - { - di(Service::class)->set($id, $definition); - } + { + di(Service::class)->set($id, $definition); + } - /** - * @param $id - * @param $definition - * @throws \ReflectionException - * @throws \Snowflake\Exception\NotFindClassException - */ + /** + * @param $id + * @param $definition + * @throws \ReflectionException + * @throws \Snowflake\Exception\NotFindClassException + */ public function has($id): bool - { - return di(Service::class)->has($id); - } - + { + return di(Service::class)->has($id); + } /** @@ -514,7 +500,7 @@ abstract class BaseApplication extends Component 'router' => ['class' => Router::class], 'event' => ['class' => Event::class], 'redis' => ['class' => Redis::class], - 'databases' => ['class' => \Database\Connection::class], + 'databases' => ['class' => \Database\Connection::class], 'aop' => ['class' => Aop::class], 'input' => ['class' => HttpParams::class], 'header' => ['class' => HttpHeaders::class], diff --git a/System/Abstracts/TraitApplication.php b/System/Abstracts/TraitApplication.php index 688cbf96..0120e218 100644 --- a/System/Abstracts/TraitApplication.php +++ b/System/Abstracts/TraitApplication.php @@ -6,8 +6,8 @@ namespace Snowflake\Abstracts; use Annotation\Annotation as SAnnotation; use Database\DatabasesProviders; -use HttpServer\Client\Client; -use HttpServer\Client\Curl; +use HttpServer\Client\Help\Client; +use HttpServer\Client\Help\Curl; use HttpServer\Client\Http2; use HttpServer\Http\Request; use HttpServer\Http\Response; @@ -15,7 +15,6 @@ use HttpServer\HttpFilter; use HttpServer\Route\Router; use HttpServer\Server; use HttpServer\Shutdown; -use Kafka\Producer; use Rpc\Producer as RPCProducer; use Snowflake\Async; use Snowflake\Cache\Redis; @@ -42,7 +41,6 @@ use Snowflake\Pool\Pool; * @property SAnnotation $annotation * @property Http2 $http2 * @property BaseGoto $goto - * @property Producer $kafka * @property Client $client * @property \Database\Connection $databases * @property Curl $curl