This commit is contained in:
2021-08-05 15:15:42 +08:00
parent 5f57b77a53
commit ba00de55cf
12 changed files with 244 additions and 176 deletions
@@ -7,7 +7,7 @@
*/
declare(strict_types=1);
namespace HttpServer\Client;
namespace HttpServer\Client\Help;
use Exception;
use JetBrains\PhpStorm\Pure;
@@ -27,7 +27,7 @@ class Client extends ClientAbstracts
* @return array|string|Result
* @throws Exception
*/
public function request(string $method, $path, $params = []): array|string|Result
public function request(string $method, $path, array $params = []): array|string|Result
{
return $this->setMethod($method)
->coroutine(
@@ -44,7 +44,7 @@ class Client extends ClientAbstracts
* @throws Exception
* 使用swoole协程方式请求
*/
private function coroutine($url, $data = []): array|string|Result
private function coroutine($url, array $data = []): array|string|Result
{
try {
$client = $this->generate_client($data, ...$url);
@@ -99,7 +99,7 @@ class Client extends ClientAbstracts
/**
* @param $client
* @param SClient $client
* @param $path
* @param $data
* @return string
@@ -1,7 +1,7 @@
<?php
namespace HttpServer\Client;
namespace HttpServer\Client\Help;
use Closure;
@@ -1,7 +1,7 @@
<?php
declare(strict_types=1);
namespace HttpServer\Client;
namespace HttpServer\Client\Help;
use Exception;
@@ -22,7 +22,7 @@ class Curl extends ClientAbstracts
* @return Result|array|string
* @throws Exception
*/
public function request($method, $path, $params = []): Result|array|string
public function request($method, $path, array $params = []): Result|array|string
{
if ($method == self::GET) {
$path = $this->joinGetParams($path, $params);
@@ -1,7 +1,7 @@
<?php
namespace HttpServer\Client;
namespace HttpServer\Client\Help;
use Closure;
@@ -1,7 +1,7 @@
<?php
declare(strict_types=1);
namespace HttpServer\Client;
namespace HttpServer\Client\Help;
use Exception;
+10 -7
View File
@@ -6,11 +6,14 @@ namespace HttpServer\Client;
use Exception;
use HttpServer\Http\Context;
use Server\Events\OnAfterRequest;
use Snowflake\Abstracts\Component;
use Snowflake\Channel;
use Snowflake\Core\Json;
use Snowflake\Core\Xml;
use Snowflake\Event;
use Snowflake\Events\EventProvider;
use Snowflake\Snowflake;
use Swoole\Coroutine\Http2\Client as H2Client;
use Swoole\Http2\Request;
use Swoole\Http2\Response;
@@ -60,12 +63,12 @@ class Http2 extends Component
/**
* @param bool $isRecv
* @param bool $isRev
* @return Http2
*/
public function setIsRecv(bool $isRecv): static
public function setIsRev(bool $isRev): static
{
Context::setContext('http2isRecv', $isRecv);
Context::setContext('http2isRev', $isRev);
return $this;
}
@@ -204,10 +207,10 @@ class Http2 extends Component
{
$client = $this->getClient($domain, $ssl, $timeout);
$client->send($request);
if (Context::getContext('http2isRecv') === false) {
if (Context::getContext('http2isRev') === false) {
return null;
}
return $this->recv($client);
return $this->rev($client);
}
@@ -216,7 +219,7 @@ class Http2 extends Component
* @return mixed
* @throws Exception
*/
private function recv($client): mixed
private function rev($client): mixed
{
/** @var Response $response */
if (!Context::hasContext('http2timeout')) {
@@ -246,7 +249,7 @@ class Http2 extends Component
* @return mixed
* @throws Exception
*/
public function put($domain, $path, $params = [], $timeout = -1): Result
public function put($domain, $path, array $params = [], int $timeout = -1): Result
{
$request = $this->dispatch($domain, $path, 'PUT', $params, $timeout);
+3 -2
View File
@@ -5,11 +5,12 @@ namespace HttpServer\Client;
use Exception;
use JetBrains\PhpStorm\Pure;
use Snowflake\Abstracts\Component;
use Snowflake\Snowflake;
use Swoole\Coroutine;
use HttpServer\Client\Help\IClient;
use HttpServer\Client\Help\Client;
use HttpServer\Client\Help\Curl;
/**
* Class ClientDriver
+129
View File
@@ -0,0 +1,129 @@
<?php
namespace Kafka;
use Exception;
use RdKafka\Conf;
use RdKafka\ProducerTopic;
use RdKafka\TopicConf;
use Snowflake\Abstracts\BaseObject;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
/**
*
*/
class KafkaClient extends BaseObject
{
private Conf $conf;
private TopicConf $topicConf;
private ?\RdKafka\Producer $producer = null;
private bool $isAck = true;
/**
* Producer constructor.
* @param string $topic
* @param string $groupId
* @param string $brokers
* @throws NotFindClassException
* @throws Exception
*/
public function __construct(public string $topic, string $groupId, string $brokers)
{
parent::__construct([]);
$this->conf = di(Conf::class);
$this->conf->set('metadata.broker.list', $brokers);
$this->conf->set('group.id', $groupId);
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
$this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
$this->topicConf = di(TopicConf::class);
}
/**
* @param array $params
* @param bool $isAck
* @throws Exception
*/
public function dispatch(array $params = [], bool $isAck = false)
{
$this->sendMessage([$params], $isAck);
}
/**
* @param string|null $key
* @param array $data
* @param bool $isAck
* @throws Exception
*/
public function batch(?string $key, array $data, bool $isAck = false)
{
$this->sendMessage($data, $key, $isAck);
}
/**
* @return \RdKafka\Producer
* @throws Exception
*/
private function getProducer(): \RdKafka\Producer
{
return Snowflake::getDi()->get(\RdKafka\Producer::class, [$this->conf]);
}
/**
* @param \RdKafka\Producer $producer
* @param $topic
* @param $isAck
* @return ProducerTopic
*/
private function getProducerTopic(\RdKafka\Producer $producer, $topic, $isAck): ProducerTopic
{
$this->topicConf->set('request.required.acks', $isAck ? '1' : '0');
return $producer->newTopic($topic, $this->topicConf);
}
/**
* @param array $message
* @param string $key
* @param bool $isAck
* @throws Exception
*/
private function sendMessage(array $message, string $key = '', bool $isAck = false)
{
$producer = $this->getProducer();
$producerTopic = $this->getProducerTopic($producer, $this->topic, $isAck);
if ($this->isAck) {
$this->flush($producer);
}
foreach ($message as $value) {
$producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key);
$producer->poll(0);
}
$this->flush($producer);
}
/**
* @param \RdKafka\Producer $producer
*/
private function flush(\RdKafka\Producer $producer)
{
while ($producer->getOutQLen() > 0) {
$result = $producer->flush(100);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
}
}
+23 -24
View File
@@ -19,29 +19,28 @@ use Snowflake\Application;
class KafkaImports extends Providers
{
/**
* @param Application $application
* @throws Exception
*/
public function onImport(Application $application)
{
/** @var Server $server */
$server = $application->get('server');
$application->set('kafka', ['class' => Producer::class]);
$kafka = SConfig::get('kafka');
if (empty($kafka) || !($kafka['enable'] ?? false)) {
return;
}
if (!extension_loaded('rdkafka')) {
return;
}
$kafkaServers = Config::get('kafka.consumers', []);
if (empty($kafkaServers)) {
return;
}
foreach ($kafkaServers as $kafkaServer) {
$server->addProcess(new Kafka($kafkaServer));
}
}
/**
* @param Application $application
* @throws Exception
*/
public function onImport(Application $application)
{
if (!extension_loaded('rdkafka')) {
return;
}
$kafka = SConfig::get('kafka', ['enable' => false]);
if (($kafka['enable'] ?? false) == false) {
return;
}
$kafkaServers = Config::get('kafka.consumers', []);
if (empty($kafkaServers)) {
return;
}
/** @var Server $server */
$server = $application->get('server');
foreach ($kafkaServers as $kafkaServer) {
$server->addProcess(new Kafka($kafkaServer));
}
}
}
+19 -67
View File
@@ -7,9 +7,12 @@ use Exception;
use RdKafka\Conf;
use RdKafka\ProducerTopic;
use RdKafka\TopicConf;
use ReflectionException;
use Snowflake\Abstracts\BaseObject;
use Snowflake\Abstracts\Component;
use Snowflake\Abstracts\Config;
use Snowflake\Exception\ConfigException;
use Snowflake\Exception\NotFindClassException;
use Snowflake\Snowflake;
/**
@@ -23,7 +26,7 @@ use Snowflake\Snowflake;
* @author $_SWANBR_AUTHOR_$
* +------------------------------------------------------------------------------
*/
class Producer extends Component
class Producer extends BaseObject
{
@@ -36,55 +39,33 @@ class Producer extends Component
/**
* Producer constructor.
* @param array $config
* @param string $topic
* @param string $groupId
* @param string $brokers
* @throws ReflectionException
* @throws NotFindClassException
* @throws Exception
*/
public function __construct(array $config = [])
public function __construct(public string $topic, string $groupId, string $brokers)
{
parent::__construct($config);
if (!class_exists(Conf::class)) {
return;
}
parent::__construct([]);
$this->conf = di(Conf::class);
$this->conf->set('metadata.broker.list', $brokers);
$this->conf->set('group.id', $groupId);
$this->conf->setErrorCb(function ($kafka, $err, $reason) {
$this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason));
});
$this->topicConf = di(TopicConf::class);
}
/**
* @param string $servers
* @return Producer
*/
public function setBrokers(string $servers): static
{
$this->conf->set('metadata.broker.list', $servers);
return $this;
}
/**
* @param string $groupId
* @return Producer
*/
public function setGroupId(string $groupId): static
{
$this->conf->set('group.id', $groupId);
return $this;
}
/**
* @param string $topic
* @param array $params
* @param string|null $groupId
* @throws Exception
*/
public function dispatch(string $topic, array $params = [], string $groupId = null)
public function dispatch(array $params = [], string $groupId = null)
{
$this->beforePushMessage($topic, $groupId);
$this->sendMessage($topic, [$params]);
$this->sendMessage([$params]);
}
/**
@@ -110,56 +91,27 @@ class Producer extends Component
/**
* @param string $toPic
* @param string|null $key
* @param array $data
* @param string|null $groupId
* @throws ConfigException
* @throws Exception
*/
public function batch(string $toPic, ?string $key, array $data, ?string $groupId = null)
public function batch(?string $key, array $data, ?string $groupId = null)
{
$this->beforePushMessage($toPic, $groupId);
$this->sendMessage($toPic, $data, $key);
$this->sendMessage($data, $key);
}
/**
* @param $topic
* @param $groupId
* @throws ConfigException
* @throws Exception
*/
private function beforePushMessage($topic, $groupId): void
{
$consumers = Config::get('kafka.producers.' . $topic);
if (empty($consumers) || !is_array($consumers)) {
throw new Exception('You need set kafka.producers config');
}
if (!isset($consumers['brokers'])) {
throw new Exception('You need set brokers config.');
}
if (!empty($groupId)) {
$consumers['groupId'] = $groupId;
} else if (!isset($consumers['groupId'])) {
$consumers['groupId'] = $topic . ':' . Snowflake::localhost();
}
$this->setGroupId($consumers['groupId']);
$this->setBrokers($consumers['brokers']);
}
/**
* @param string $topic
* @param array $message
* @param string $key
* @throws Exception
*/
private function sendMessage(string $topic, array $message, string $key = '')
private function sendMessage(array $message, string $key = '')
{
$producer = $this->getProducer();
$producerTopic = $this->getProducerTopic($producer, $topic);
$producerTopic = $this->getProducerTopic($producer, $this->topic);
if ($this->isAck) {
$this->flush($producer);
}
+49 -63
View File
@@ -22,7 +22,6 @@ use HttpServer\Route\Router;
use HttpServer\Server;
use HttpServer\Shutdown;
use JetBrains\PhpStorm\Pure;
use Kafka\Producer;
use Kafka\KafkaProvider;
use ReflectionException;
use Server\ServerManager;
@@ -167,22 +166,22 @@ abstract class BaseApplication extends Component
}
/**
* @param $name
* @return mixed
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
/**
* @param $name
* @return mixed
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
public function __get($name): mixed
{
if ($this->has($name)) {
return $this->get($name);
}
return parent::__get($name); // TODO: Change the autogenerated stub
}
{
if ($this->has($name)) {
return $this->get($name);
}
return parent::__get($name); // TODO: Change the autogenerated stub
}
/**
/**
* @param $config
*
* @throws
@@ -262,17 +261,16 @@ abstract class BaseApplication extends Component
}
/**
* @param $name
* @return mixed
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
/**
* @param $name
* @return mixed
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
public function get($name): mixed
{
return di(Service::class)->get($name);
}
{
return di(Service::class)->get($name);
}
/**
@@ -302,16 +300,6 @@ abstract class BaseApplication extends Component
}
/**
* @return Producer
* @throws Exception
*/
public function getKafka(): Producer
{
return $this->get('kafka');
}
/**
* @return \Redis|Redis
* @throws Exception
@@ -461,41 +449,39 @@ abstract class BaseApplication extends Component
}
/**
* @param $array
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
/**
* @param $array
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
private function setComponents($array): void
{
di(Service::class)->setComponents($array);
}
{
di(Service::class)->setComponents($array);
}
/**
* @param $id
* @param $definition
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
/**
* @param $id
* @param $definition
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
public function set($id, $definition): void
{
di(Service::class)->set($id, $definition);
}
{
di(Service::class)->set($id, $definition);
}
/**
* @param $id
* @param $definition
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
/**
* @param $id
* @param $definition
* @throws \ReflectionException
* @throws \Snowflake\Exception\NotFindClassException
*/
public function has($id): bool
{
return di(Service::class)->has($id);
}
{
return di(Service::class)->has($id);
}
/**
@@ -514,7 +500,7 @@ abstract class BaseApplication extends Component
'router' => ['class' => Router::class],
'event' => ['class' => Event::class],
'redis' => ['class' => Redis::class],
'databases' => ['class' => \Database\Connection::class],
'databases' => ['class' => \Database\Connection::class],
'aop' => ['class' => Aop::class],
'input' => ['class' => HttpParams::class],
'header' => ['class' => HttpHeaders::class],
+2 -4
View File
@@ -6,8 +6,8 @@ namespace Snowflake\Abstracts;
use Annotation\Annotation as SAnnotation;
use Database\DatabasesProviders;
use HttpServer\Client\Client;
use HttpServer\Client\Curl;
use HttpServer\Client\Help\Client;
use HttpServer\Client\Help\Curl;
use HttpServer\Client\Http2;
use HttpServer\Http\Request;
use HttpServer\Http\Response;
@@ -15,7 +15,6 @@ use HttpServer\HttpFilter;
use HttpServer\Route\Router;
use HttpServer\Server;
use HttpServer\Shutdown;
use Kafka\Producer;
use Rpc\Producer as RPCProducer;
use Snowflake\Async;
use Snowflake\Cache\Redis;
@@ -42,7 +41,6 @@ use Snowflake\Pool\Pool;
* @property SAnnotation $annotation
* @property Http2 $http2
* @property BaseGoto $goto
* @property Producer $kafka
* @property Client $client
* @property \Database\Connection $databases
* @property Curl $curl