diff --git a/Database/ActiveRecord.php b/Database/ActiveRecord.php index c6a566ac..9eaebc6c 100644 --- a/Database/ActiveRecord.php +++ b/Database/ActiveRecord.php @@ -233,6 +233,7 @@ class ActiveRecord extends BaseActiveRecord * @return bool * @throws NotFindClassException * @throws ReflectionException + * @throws Exception */ public static function updateAll(mixed $condition, array $attributes = []): bool { diff --git a/Database/Affair/BeginTransaction.php b/Database/Affair/BeginTransaction.php new file mode 100644 index 00000000..2c3cee40 --- /dev/null +++ b/Database/Affair/BeginTransaction.php @@ -0,0 +1,10 @@ +beforeSave($this)) { - static::getDb()->enablingTransactions(); [$change, $condition, $fields] = $this->filtration_and_separation(); if (!$this->isNewExample) { return $this->updateInternal($fields, $condition, $change); diff --git a/Database/Command.php b/Database/Command.php index d6bfebeb..e47e881c 100644 --- a/Database/Command.php +++ b/Database/Command.php @@ -133,9 +133,11 @@ class Command extends Component $this->warning('Mysql:' . Json::encode([$this->sql, $this->params]) . (microtime(true) - $time)); } $this->prepare?->closeCursor(); - return $result; } catch (\Throwable $exception) { - return $this->addError($this->sql . '. error: ' . $exception->getMessage(), 'mysql'); + $result = $this->addError($this->sql . '. error: ' . $exception->getMessage(), 'mysql'); + } finally { + $this->db->release(); + return $result; } } diff --git a/Database/Connection.php b/Database/Connection.php index 8f3de2e4..878ddc30 100644 --- a/Database/Connection.php +++ b/Database/Connection.php @@ -11,14 +11,21 @@ declare(strict_types=1); namespace Database; +use Annotation\Inject; +use Database\Affair\BeginTransaction; +use Database\Affair\Commit; +use Database\Affair\Rollback; use Database\Mysql\Schema; use Exception; use JetBrains\PhpStorm\Pure; use PDO; use ReflectionException; +use Server\Events\OnWorkerExit; +use Server\Events\OnWorkerStop; use Snowflake\Abstracts\Component; use Snowflake\Abstracts\Config; use Snowflake\Event; +use Snowflake\Events\EventProvider; use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; @@ -28,8 +35,6 @@ use Snowflake\Snowflake; */ class Connection extends Component { - const TRANSACTION_COMMIT = 'transaction::commit'; - const TRANSACTION_ROLLBACK = 'transaction::rollback'; public string $id = 'db'; public string $cds = ''; @@ -47,45 +52,46 @@ class Connection extends Component * enable database cache */ public bool $enableCache = false; + + + /** + * @var string + */ public string $cacheDriver = 'redis'; /** * @var array - * - * @example [ - * 'cds' => 'mysql:dbname=dbname;host=127.0.0.1', - * 'username' => 'root', - * 'password' => 'root' - * ] */ public array $slaveConfig = []; - private ?Schema $_schema = null; + + /** + * @var Schema + */ + #[Inject(Schema::class)] + public Schema $_schema; /** - * @throws Exception + * @var EventProvider + */ + #[Inject(EventProvider::class)] + public EventProvider $eventProvider; + + + /** + * execute by __construct */ public function init() { - Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'disconnect']); - Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'clear_connection']); + $this->eventProvider->on(OnWorkerStop::class, [$this, 'clear_connection'], 0); + $this->eventProvider->on(OnWorkerExit::class, [$this, 'clear_connection'], 0); + $this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); + $this->eventProvider->on(Rollback::class, [$this, 'rollback'], 0); + $this->eventProvider->on(Commit::class, [$this, 'commit'], 0); } - /** - * @throws Exception - */ - public function enablingTransactions() - { - if (!Db::transactionsActive()) { - return; - } - $this->beginTransaction(); - Event::on(Connection::TRANSACTION_ROLLBACK, [$this, 'rollback'], [], true); - Event::on(Connection::TRANSACTION_COMMIT, [$this, 'commit'], [], true); - } - /** * @param null $sql * @return PDO @@ -233,6 +239,7 @@ class Connection extends Component public function rollback() { $this->connections()->rollback($this->cds); + $this->release(); } /** @@ -242,6 +249,7 @@ class Connection extends Component public function commit() { $this->connections()->commit($this->cds); + $this->release(); } /** @@ -279,8 +287,11 @@ class Connection extends Component */ public function release() { + if (!Snowflake::isWorker() && !Snowflake::isProcess()) { + $this->clear_connection(); + return; + } $connections = $this->connections(); - $connections->release($this->cds, true); $connections->release($this->slaveConfig['cds'], false); } @@ -306,8 +317,8 @@ class Connection extends Component { $connections = $this->connections(); - $connections->release($this->cds, true); - $connections->release($this->slaveConfig['cds'], false); + $connections->disconnect($this->cds, true); + $connections->disconnect($this->slaveConfig['cds'], false); } diff --git a/Database/DatabasesProviders.php b/Database/DatabasesProviders.php index ddd4e4fa..788da13a 100644 --- a/Database/DatabasesProviders.php +++ b/Database/DatabasesProviders.php @@ -4,19 +4,15 @@ declare(strict_types=1); namespace Database; -use Annotation\IAnnotation; +use Annotation\Inject; use Exception; -use ReflectionException; -use Server\Constant; +use Server\Events\OnWorkerStart; +use Snowflake\Abstracts\Config; use Snowflake\Abstracts\Providers; use Snowflake\Application; -use Snowflake\Event; -use Snowflake\Exception\ComponentException; +use Snowflake\Events\EventProvider; use Snowflake\Exception\ConfigException; -use Snowflake\Exception\NotFindClassException; -use Snowflake\Exception\NotFindPropertyException; use Snowflake\Snowflake; -use Snowflake\Abstracts\Config; /** * Class DatabasesProviders @@ -25,89 +21,96 @@ use Snowflake\Abstracts\Config; class DatabasesProviders extends Providers { - private array $_pooLength = ['min' => 0, 'max' => 1]; + private array $_pooLength = ['min' => 0, 'max' => 1]; - /** - * @param Application $application - * @throws Exception - */ - public function onImport(Application $application) - { - $application->set('db', $this); - - $this->_pooLength = Config::get('databases.pool', ['min' => 0, 'max' => 1]); - - Event::on(Event::SERVER_TASK_START, [$this, 'createPool']); - } + /** + * @var EventProvider + */ + #[Inject(EventProvider::class)] + public EventProvider $eventProvider; - /** - * @param $name - * @return Connection - * @throws ConfigException - * @throws Exception - */ - public function get($name): Connection - { - $application = Snowflake::app(); - if (!$application->has('databases.' . $name)) { - $application->set('databases.' . $name, $this->_settings($this->getConfig($name))); - } - return $application->get('databases.' . $name); - } + /** + * @param Application $application + * @throws Exception + */ + public function onImport(Application $application) + { + $application->set('db', $this); + + $this->_pooLength = Config::get('databases.pool', ['min' => 0, 'max' => 1]); + + $this->eventProvider->on(OnWorkerStart::class, [$this, 'createPool']); + } - /** - * @throws ConfigException - * @throws Exception - */ - public function createPool() - { - $databases = Config::get('databases.connections', []); - if (empty($databases)) { - return; - } - $application = Snowflake::app(); - foreach ($databases as $name => $database) { - /** @var Connection $connection */ - $application->set('databases.' . $name, $this->_settings($database)); - $application->get('databases.' . $name)->fill(); - } - } + /** + * @param $name + * @return Connection + * @throws ConfigException + * @throws Exception + */ + public function get($name): Connection + { + $application = Snowflake::app(); + if (!$application->has('databases.' . $name)) { + $application->set('databases.' . $name, $this->_settings($this->getConfig($name))); + } + return $application->get('databases.' . $name); + } - /** - * @param $database - * @return array - */ - private function _settings($database): array - { - return [ - 'class' => Connection::class, - 'id' => $database['id'], - 'cds' => $database['cds'], - 'username' => $database['username'], - 'password' => $database['password'], - 'tablePrefix' => $database['tablePrefix'], - 'database' => $database['database'], - 'maxNumber' => $this->_pooLength['max'], - 'minNumber' => $this->_pooLength['min'], - 'charset' => $database['charset'] ?? 'utf8mb4', - 'slaveConfig' => $database['slaveConfig'] - ]; - } + /** + * @throws ConfigException + * @throws Exception + */ + public function createPool(OnWorkerStart $onWorkerStart) + { + $databases = Config::get('databases.connections', []); + if (empty($databases)) { + return; + } + $application = Snowflake::app(); + foreach ($databases as $name => $database) { + /** @var Connection $connection */ + $application->set('databases.' . $name, $this->_settings($database)); + $application->get('databases.' . $name)->fill(); + } + } - /** - * @param $name - * @return mixed - * @throws ConfigException - */ - public function getConfig($name): mixed - { - return Config::get('databases.connections.' . $name, null, true); - } + /** + * @param $database + * @return array + */ + private function _settings($database): array + { + return [ + 'class' => Connection::class, + 'id' => $database['id'], + 'cds' => $database['cds'], + 'username' => $database['username'], + 'password' => $database['password'], + 'tablePrefix' => $database['tablePrefix'], + 'database' => $database['database'], + 'maxNumber' => $this->_pooLength['max'], + 'minNumber' => $this->_pooLength['min'], + 'charset' => $database['charset'] ?? 'utf8mb4', + 'slaveConfig' => $database['slaveConfig'] + ]; + } + + + /** + * @param $name + * @return mixed + * @throws ConfigException + */ + public function getConfig($name): mixed + { + return Config::get('databases.connections.' . $name, null, true); + } } diff --git a/Database/Db.php b/Database/Db.php index ab56348d..30961361 100644 --- a/Database/Db.php +++ b/Database/Db.php @@ -9,10 +9,13 @@ declare(strict_types=1); namespace Database; +use Database\Affair\BeginTransaction; +use Database\Affair\Commit; +use Database\Affair\Rollback; use Database\Traits\QueryTrait; use Exception; use Snowflake\Abstracts\Config; -use Snowflake\Event; +use Snowflake\Events\EventDispatch; use Snowflake\Exception\ConfigException; /** @@ -39,19 +42,21 @@ class Db implements ISqlBuilder */ public static function beginTransaction() { + if (!static::transactionsActive()) { + di(EventDispatch::class)->dispatch(new BeginTransaction()); + } static::$_inTransaction = true; } + /** * @throws Exception */ public static function commit() { - if (!static::transactionsActive()) { - return; + if (static::transactionsActive()) { + di(EventDispatch::class)->dispatch(new Commit()); } - Event::trigger(Connection::TRANSACTION_COMMIT); - Event::offName(Connection::TRANSACTION_COMMIT); static::$_inTransaction = false; } @@ -61,11 +66,9 @@ class Db implements ISqlBuilder */ public static function rollback() { - if (!static::transactionsActive()) { - return; + if (static::transactionsActive()) { + di(EventDispatch::class)->dispatch(new Rollback()); } - Event::trigger(Connection::TRANSACTION_ROLLBACK); - Event::offName(Connection::TRANSACTION_ROLLBACK); static::$_inTransaction = false; } diff --git a/HttpServer/Client/Http2.php b/HttpServer/Client/Http2.php index 7080d72f..39bff535 100644 --- a/HttpServer/Client/Http2.php +++ b/HttpServer/Client/Http2.php @@ -11,7 +11,6 @@ use Snowflake\Channel; use Snowflake\Core\Json; use Snowflake\Core\Xml; use Snowflake\Event; -use Snowflake\Snowflake; use Swoole\Coroutine\Http2\Client as H2Client; use Swoole\Http2\Request; use Swoole\Http2\Response; @@ -28,16 +27,11 @@ class Http2 extends Component private array $_clients = []; - private Channel $channel; - - /** * @throws Exception */ public function init() { - $this->channel = Snowflake::getApp('channel'); - Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'releases']); Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'clean']); } @@ -48,11 +42,6 @@ class Http2 extends Component */ public function releases() { - foreach ($this->_clients as $name => $client) { - /** @var H2Client $client */ - $client->close(); - $this->channel->push($client, 'http2.' . $name); - } $this->_clients = []; } @@ -111,7 +100,7 @@ class Http2 extends Component * @return Result * @throws Exception */ - public function get($domain, $path, $params = [], $timeout = -1): Result + public function get($domain, $path, array $params = [], int $timeout = -1): Result { $request = $this->dispatch($domain, $path, 'GET', $params, $timeout); @@ -127,7 +116,7 @@ class Http2 extends Component * @return Result * @throws Exception */ - public function post($domain, $path, $params = [], $timeout = -1): Result + public function post($domain, $path, array $params = [], int $timeout = -1): Result { $request = $this->dispatch($domain, $path, 'POST', $params, $timeout); @@ -143,7 +132,7 @@ class Http2 extends Component * @return Result * @throws Exception */ - public function upload($domain, $path, $params = [], $timeout = -1): Result + public function upload($domain, $path, array $params = [], int $timeout = -1): Result { $request = $this->dispatch($domain, $path, 'POST', $params, $timeout, true); @@ -159,7 +148,7 @@ class Http2 extends Component * @return Result * @throws Exception */ - public function delete($domain, $path, $params = [], $timeout = -1): Result + public function delete($domain, $path, array $params = [], int $timeout = -1): Result { $request = $this->dispatch($domain, $path, 'DELETE', $params, $timeout); @@ -177,7 +166,7 @@ class Http2 extends Component * @return mixed * @throws Exception */ - private function dispatch($domain, $path, $method, $params = [], $timeout = -1, $isUpload = false): mixed + private function dispatch($domain, $path, $method, array $params = [], int $timeout = -1, bool $isUpload = false): mixed { [$domain, $isSsl] = $this->clear($domain); @@ -185,7 +174,6 @@ class Http2 extends Component $request->headers = array_merge($request->headers, [ 'Host' => $domain ]); - defer(fn() => $this->channel->push($request, 'request.' . $method . $path)); return $this->doRequest($request, $domain, $isSsl, $timeout); } @@ -215,7 +203,6 @@ class Http2 extends Component private function doRequest(Request $request, $domain, $ssl, $timeout): mixed { $client = $this->getClient($domain, $ssl, $timeout); - defer(fn() => $this->channel->push($client, 'http2.' . $domain)); $client->send($request); if (Context::getContext('http2isRecv') === false) { return null; @@ -275,18 +262,15 @@ class Http2 extends Component * @return Request * @throws Exception */ - public function getRequest($path, $method, $params, $isUpload = false): Request + public function getRequest($path, $method, $params, bool $isUpload = false): Request { if (!str_starts_with($path, '/')) { $path = '/' . $path; } - $channel = Snowflake::app()->getChannel(); - $request = $channel->pop('request.' . $method . $path, function () use ($path, $method) { - $request = new Request(); - $request->method = $method; - $request->path = $path; - return $request; - }); + + $request = new Request(); + $request->method = $method; + $request->path = $path; if ($method === 'GET') { $request->path .= '?' . http_build_query($params); } else { @@ -304,16 +288,12 @@ class Http2 extends Component * @return H2Client * @throws Exception */ - private function getClient($domain, $isSsl = false, $timeout = -1): H2Client + private function getClient($domain, bool $isSsl = false, int $timeout = -1): H2Client { if (isset($this->_clients[$domain])) { return $this->_clients[$domain]; } - $pool = Snowflake::app()->getChannel(); - /** @var H2Client $client */ - $client = $pool->pop('http2.' . $domain, function () use ($domain, $isSsl, $timeout) { - return $this->newRequest($domain, $isSsl, $timeout); - }); + $client = $this->newRequest($domain, $isSsl, $timeout); if ((!$client->connected || !$client->ping()) && !$client->connect()) { throw new Exception($client->errMsg, $client->errCode); } diff --git a/HttpServer/Http/Context.php b/HttpServer/Http/Context.php index f52ccd7c..aef21255 100644 --- a/HttpServer/Http/Context.php +++ b/HttpServer/Http/Context.php @@ -19,25 +19,27 @@ class Context extends BaseContext /** * @param $id * @param $context + * @param null $coroutineId * @return mixed */ - public static function setContext($id, $context): mixed + public static function setContext($id, $context, $coroutineId = null): mixed { if (Coroutine::getCid() === -1) { return static::$_contents[$id] = $context; } - return Coroutine::getContext()[$id] = $context; + return Coroutine::getContext($coroutineId)[$id] = $context; } /** * @param $id * @param int $value + * @param null $coroutineId * @return bool|int */ - public static function increment($id, int $value = 1): bool|int + public static function increment($id, int $value = 1, $coroutineId = null): bool|int { - if (!isset(Coroutine::getContext()[$id])) { - return Coroutine::getContext()[$id] += $value; + if (!isset(Coroutine::getContext($coroutineId)[$id])) { + return Coroutine::getContext($coroutineId)[$id] += $value; } return false; } @@ -45,15 +47,16 @@ class Context extends BaseContext /** * @param $id * @param int $value + * @param null $coroutineId * @return bool|int */ - public static function decrement($id, int $value = 1): bool|int + public static function decrement($id, int $value = 1, $coroutineId = null): bool|int { if (!static::hasContext($id)) { return false; } - if (isset(Coroutine::getContext()[$id])) { - return Coroutine::getContext()[$id] -= $value; + if (isset(Coroutine::getContext($coroutineId)[$id])) { + return Coroutine::getContext($coroutineId)[$id] -= $value; } return false; } @@ -61,25 +64,27 @@ class Context extends BaseContext /** * @param $id * @param null $default + * @param null $coroutineId * @return mixed */ - public static function getContext($id, $default = null): mixed + public static function getContext($id, $default = null, $coroutineId = null): mixed { if (Coroutine::getCid() === -1) { return static::loadByStatic($id, $default); } - return static::loadByContext($id, $default); + return static::loadByContext($id, $default, $coroutineId); } /** * @param $id * @param null $default + * @param null $coroutineId * @return mixed */ - private static function loadByContext($id, $default = null): mixed + private static function loadByContext($id, $default = null, $coroutineId = null): mixed { - $data = Coroutine::getContext()[$id] ?? null; + $data = Coroutine::getContext($coroutineId)[$id] ?? null; if ($data === null) { return $default; } @@ -103,12 +108,13 @@ class Context extends BaseContext /** + * @param null $coroutineId * @return mixed */ - public static function getAllContext(): mixed + public static function getAllContext($coroutineId = null): mixed { if (Coroutine::getCid() === -1) { - return Coroutine::getContext() ?? []; + return Coroutine::getContext($coroutineId) ?? []; } else { return static::$_contents ?? []; } @@ -116,16 +122,17 @@ class Context extends BaseContext /** * @param string $id + * @param null $coroutineId */ - public static function remove(string $id) + public static function remove(string $id, $coroutineId = null) { - if (!static::hasContext($id)) { + if (!static::hasContext($id, $coroutineId)) { return; } if (Coroutine::getCid() === -1) { unset(static::$_contents[$id]); } else { - unset(Coroutine::getContext()[$id]); + unset(Coroutine::getContext($coroutineId)[$id]); } } @@ -134,12 +141,12 @@ class Context extends BaseContext * @param null $key * @return bool */ - public static function hasContext($id, $key = null): bool + public static function hasContext($id, $key = null, $coroutineId = null): bool { if (Coroutine::getCid() === -1) { return static::searchByStatic($id, $key); } - return static::searchByCoroutine($id, $key); + return static::searchByCoroutine($id, $key, $coroutineId); } @@ -163,15 +170,16 @@ class Context extends BaseContext /** * @param $id * @param null $key + * @param null $coroutineId * @return bool */ - private static function searchByCoroutine($id, $key = null): bool + private static function searchByCoroutine($id, $key = null, $coroutineId = null): bool { - if (!isset(Coroutine::getContext()[$id])) { + if (!isset(Coroutine::getContext($coroutineId)[$id])) { return false; } if ($key !== null) { - return isset((Coroutine::getContext()[$id] ?? [])[$key]); + return isset((Coroutine::getContext($coroutineId)[$id] ?? [])[$key]); } return true; } diff --git a/Kafka/Producer.php b/Kafka/Producer.php index 909dfe29..80c52f56 100644 --- a/Kafka/Producer.php +++ b/Kafka/Producer.php @@ -7,12 +7,9 @@ use Exception; use RdKafka\Conf; use RdKafka\ProducerTopic; use RdKafka\TopicConf; -use ReflectionException; use Snowflake\Abstracts\Component; use Snowflake\Abstracts\Config; -use Snowflake\Event; use Snowflake\Exception\ConfigException; -use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; /** @@ -30,77 +27,74 @@ class Producer extends Component { - private Conf $conf; - private TopicConf $topicConf; + private Conf $conf; + private TopicConf $topicConf; - private ?\RdKafka\Producer $producer = null; + private ?\RdKafka\Producer $producer = null; - private bool $isAck = true; + private bool $isAck = true; - /** - * Producer constructor. - * @param array $config - * @throws Exception - */ - public function __construct(array $config = []) - { - parent::__construct($config); - if (!class_exists(Conf::class)) { - return; - } - $this->conf = new Conf(); - $this->topicConf = new TopicConf(); - $this->conf->setErrorCb(function ($kafka, $err, $reason) { - $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); - }); - } + /** + * Producer constructor. + * @param array $config + * @throws Exception + */ + public function __construct(array $config = []) + { + parent::__construct($config); + if (!class_exists(Conf::class)) { + return; + } + $this->conf = new Conf(); + $this->topicConf = new TopicConf(); + $this->conf->setErrorCb(function ($kafka, $err, $reason) { + $this->error(sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason)); + }); + } /** * @param string $servers * @return Producer */ - public function setBrokers(string $servers): static - { - $this->conf->set('metadata.broker.list', $servers); - return $this; - } + public function setBrokers(string $servers): static + { + $this->conf->set('metadata.broker.list', $servers); + return $this; + } - /** - * @param string $groupId - * @return Producer - */ - public function setGroupId(string $groupId): static - { - $this->conf->set('group.id', $groupId); - return $this; - } + /** + * @param string $groupId + * @return Producer + */ + public function setGroupId(string $groupId): static + { + $this->conf->set('group.id', $groupId); + return $this; + } - /** - * @param string $topic - * @param array $params - * @param string|null $groupId - * @throws Exception - */ - public function dispatch(string $topic, array $params = [], string $groupId = null) - { - $this->beforePushMessage($topic, $groupId); - $this->sendMessage($topic, [$params]); - } + /** + * @param string $topic + * @param array $params + * @param string|null $groupId + * @throws Exception + */ + public function dispatch(string $topic, array $params = [], string $groupId = null) + { + $this->beforePushMessage($topic, $groupId); + $this->sendMessage($topic, [$params]); + } - /** - * @return \RdKafka\Producer - * @throws Exception - */ - private function getProducer(): \RdKafka\Producer - { - $pool = Snowflake::app()->getChannel(); - return $pool->pop(\RdKafka\Producer::class, function () { - return Snowflake::createObject(\RdKafka\Producer::class, [$this->conf]); - }); - } + /** + * @return \RdKafka\Producer + * @throws Exception + */ + private function getProducer(): \RdKafka\Producer + { + return Snowflake::createObject(\RdKafka\Producer::class, [$this->conf]); + } /** @@ -109,13 +103,10 @@ class Producer extends Component * @return ProducerTopic * @throws Exception */ - private function getProducerTopic(\RdKafka\Producer $producer, $topic): ProducerTopic - { - $pool = Snowflake::app()->getChannel(); - return $pool->pop($topic . '::' . ProducerTopic::class, function () use ($producer, $topic) { - return $producer->newTopic($topic, $this->topicConf); - }); - } + private function getProducerTopic(\RdKafka\Producer $producer, $topic): ProducerTopic + { + return $producer->newTopic($topic, $this->topicConf); + } /** @@ -126,12 +117,12 @@ class Producer extends Component * @throws ConfigException * @throws Exception */ - public function batch(string $toPic, ?string $key, array $data, ?string $groupId = null) - { - $this->beforePushMessage($toPic, $groupId); + public function batch(string $toPic, ?string $key, array $data, ?string $groupId = null) + { + $this->beforePushMessage($toPic, $groupId); - $this->sendMessage($toPic, $data, $key); - } + $this->sendMessage($toPic, $data, $key); + } /** @@ -140,23 +131,23 @@ class Producer extends Component * @throws ConfigException * @throws Exception */ - private function beforePushMessage($topic, $groupId): void - { - $consumers = Config::get('kafka.producers.' . $topic); - if (empty($consumers) || !is_array($consumers)) { - throw new Exception('You need set kafka.producers config'); - } - if (!isset($consumers['brokers'])) { - throw new Exception('You need set brokers config.'); - } - if (!empty($groupId)) { - $consumers['groupId'] = $groupId; - } else if (!isset($consumers['groupId'])) { - $consumers['groupId'] = $topic . ':' . Snowflake::localhost(); - } - $this->setGroupId($consumers['groupId']); - $this->setBrokers($consumers['brokers']); - } + private function beforePushMessage($topic, $groupId): void + { + $consumers = Config::get('kafka.producers.' . $topic); + if (empty($consumers) || !is_array($consumers)) { + throw new Exception('You need set kafka.producers config'); + } + if (!isset($consumers['brokers'])) { + throw new Exception('You need set brokers config.'); + } + if (!empty($groupId)) { + $consumers['groupId'] = $groupId; + } else if (!isset($consumers['groupId'])) { + $consumers['groupId'] = $topic . ':' . Snowflake::localhost(); + } + $this->setGroupId($consumers['groupId']); + $this->setBrokers($consumers['brokers']); + } /** @@ -165,58 +156,42 @@ class Producer extends Component * @param string $key * @throws Exception */ - private function sendMessage(string $topic, array $message, string $key = '') - { - $producer = $this->getProducer(); - $producerTopic = $this->getProducerTopic($producer, $topic); - - if ($this->isAck) { - $this->flush($producer); - } - - foreach ($message as $value) { - $producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key); - $producer->poll(0); - } - $this->flush($producer); - $this->recover($topic, $producer, $producerTopic); - } - - - /** - * @param bool $ack - */ - public function setAsk(bool $ack){ - $this->isAck = $ack; - $this->topicConf->set('request.required.acks', $this->isAck ? '1' : '0'); - } + private function sendMessage(string $topic, array $message, string $key = '') + { + $producer = $this->getProducer(); + $producerTopic = $this->getProducerTopic($producer, $topic); + if ($this->isAck) { + $this->flush($producer); + } + foreach ($message as $value) { + $producerTopic->produce(RD_KAFKA_PARTITION_UA, 0, swoole_serialize($value), $key); + $producer->poll(0); + } + $this->flush($producer); + } /** - * @param string $topic - * @param \RdKafka\Producer $producer - * @param ProducerTopic $producerTopic - * @throws Exception + * @param bool $ack */ - private function recover(string $topic, \RdKafka\Producer $producer, ProducerTopic $producerTopic) - { - $channel = Snowflake::app()->getChannel(); - $channel->push($producerTopic, $topic . '::' . ProducerTopic::class); - $channel->push($producer, \RdKafka\Producer::class); - } + public function setAsk(bool $ack) + { + $this->isAck = $ack; + $this->topicConf->set('request.required.acks', $this->isAck ? '1' : '0'); + } - /** - * @param \RdKafka\Producer $producer - */ - public function flush(\RdKafka\Producer $producer) - { - while ($producer->getOutQLen() > 0) { - $result = $producer->flush(100); - if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { - break; - } - } - } + /** + * @param \RdKafka\Producer $producer + */ + public function flush(\RdKafka\Producer $producer) + { + while ($producer->getOutQLen() > 0) { + $result = $producer->flush(100); + if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { + break; + } + } + } } diff --git a/Rpc/Client.php b/Rpc/Client.php index ee639652..abb91947 100644 --- a/Rpc/Client.php +++ b/Rpc/Client.php @@ -8,7 +8,6 @@ use Exception; use Snowflake\Abstracts\Component; use Snowflake\Channel; use Snowflake\Core\Json; -use Snowflake\Snowflake; use Swoole\Coroutine\Client as CClient; @@ -69,10 +68,6 @@ class Client extends Component if ($port < 0) { return $this; } - - /** @var Channel $channel */ - $channel = Snowflake::app()->get('channel'); - $channel->push($this->client, $host . $port . CClient::class); $this->client = null; return $this; } @@ -114,26 +109,19 @@ class Client extends Component */ public function getClient(): CClient { - /** @var Channel $channel */ - $channel = Snowflake::app()->get('channel'); - - $host = $this->config['host'] ?? '127.0.0.1'; $port = $this->config['port'] ?? 0; if ($port < 0) { throw new Exception('Related service not have port(404)'); } - - return $channel->pop($host . $port . CClient::class, function () { - $client = new CClient($this->config['mode'] ?? SWOOLE_SOCK_TCP); - $client->set([ - 'timeout' => 0.5, - 'connect_timeout' => 1.0, - 'write_timeout' => 10.0, - 'read_timeout' => 0.5, - 'open_tcp_keepalive' => true, - ]); - return $client; - }); + $client = new CClient($this->config['mode'] ?? SWOOLE_SOCK_TCP); + $client->set([ + 'timeout' => 0.5, + 'connect_timeout' => 1.0, + 'write_timeout' => 10.0, + 'read_timeout' => 0.5, + 'open_tcp_keepalive' => true, + ]); + return $client; } diff --git a/Rpc/Service.php b/Rpc/Service.php index 59accf2b..1b8a57d0 100644 --- a/Rpc/Service.php +++ b/Rpc/Service.php @@ -3,14 +3,17 @@ namespace Rpc; +use Annotation\Inject; use Exception; use HttpServer\Http\Context; -use HttpServer\Http\Request; use HttpServer\Route\Router; use Server\Constant; +use Server\Events\OnAfterRequest; use Snowflake\Core\Json; -use Snowflake\Event; +use Snowflake\Events\EventDispatch; +use Snowflake\Events\EventProvider; use Snowflake\Snowflake; +use Swoole\Http\Request; use Swoole\Server; use function Swoole\Coroutine\defer; @@ -28,6 +31,16 @@ class Service extends \Server\Abstracts\Server private Router $router; + /** @var EventProvider */ + #[Inject(EventProvider::class)] + public EventProvider $eventProvider; + + + /** @var EventDispatch */ + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; + + /** * @throws Exception */ @@ -45,7 +58,7 @@ class Service extends \Server\Abstracts\Server */ public function onConnect(Server $server, int $fd, int $reactorId) { - defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); + defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); $this->runEvent(Constant::CONNECT, null, [$server, $fd, $reactorId]); } @@ -59,7 +72,7 @@ class Service extends \Server\Abstracts\Server */ public function onClose(Server $server, int $fd) { - defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); + defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); $this->runEvent(Constant::CLOSE, null, [$server, $fd]); } @@ -73,7 +86,7 @@ class Service extends \Server\Abstracts\Server */ public function onDisconnect(Server $server, int $fd) { - defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); + defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); $this->runEvent(Constant::DISCONNECT, null, [$server, $fd]); } @@ -88,11 +101,11 @@ class Service extends \Server\Abstracts\Server */ public function onReceive(Server $server, int $fd, int $reID, string $data) { - defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); + defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); try { $client = $server->getClientInfo($fd, $reID); - $request = $this->requestSpl((int)$client['server_port'], $data); + $request = $this->requestSpl((int)$client['server_port'], $data, $fd); $result = $this->router->find_path($request)?->dispatch(); @@ -111,7 +124,7 @@ class Service extends \Server\Abstracts\Server */ public function onPacket(Server $server, string $data, array $client) { - defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); + defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); try { $request = $this->requestSpl((int)$client['server_port'], $data); @@ -127,10 +140,11 @@ class Service extends \Server\Abstracts\Server /** * @param int $server_port * @param string $data + * @param int $fd * @return mixed * @throws Exception */ - public function requestSpl(int $server_port, string $data): mixed + public function requestSpl(int $server_port, string $data, int $fd = 0): \HttpServer\Http\Request { $sRequest = new Request(); @@ -143,11 +157,14 @@ class Service extends \Server\Abstracts\Server throw new Exception('Protocol format error.'); } - $sRequest->params->setPosts($data); - $sRequest->headers->setRequestUri('rpc/p' . $server_port . '/' . ltrim($cmd, '/')); - $sRequest->headers->setRequestMethod('rpc'); + $sRequest->fd = $fd; + $sRequest->post = $data; + $sRequest->header['request_uri'] = 'rpc/p' . $server_port . '/' . ltrim($cmd, '/'); + $sRequest->header['request_method'] = 'rpc'; - return Context::setContext(Request::class, $sRequest); + Context::setContext(Request::class, $sRequest); + + return \request(); } diff --git a/Server/Events/OnAfterRequest.php b/Server/Events/OnAfterRequest.php new file mode 100644 index 00000000..a6b99216 --- /dev/null +++ b/Server/Events/OnAfterRequest.php @@ -0,0 +1,8 @@ +router = Snowflake::getApp('router'); - $this->response = di(\HttpServer\Http\Response::class); - parent::__construct(); - } + /** @var EventDispatch */ + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; + /** * UDPServerListener constructor. @@ -93,6 +95,8 @@ class HTTPServerListener extends Abstracts\Server $this->router->dispatch(HSRequest::create($request)); } catch (Error | Throwable $exception) { $this->response->send(jTraceEx($exception), 500); + } finally { + $this->eventDispatch->dispatch(new OnAfterRequest()); } } diff --git a/Server/Manager/ServerDefaultEvent.php b/Server/Manager/ServerDefaultEvent.php index ce2c83d9..0ece113d 100644 --- a/Server/Manager/ServerDefaultEvent.php +++ b/Server/Manager/ServerDefaultEvent.php @@ -2,11 +2,15 @@ namespace Server\Manager; +use Annotation\Inject; use Exception; use Server\Abstracts\Server; use Server\Constant; +use Server\Events\OnAfterRequest; use Server\SInterface\PipeMessage; use Snowflake\Event; +use Snowflake\Events\EventDispatch; +use Snowflake\Exception\ConfigException; /** @@ -17,9 +21,14 @@ class ServerDefaultEvent extends Server { + /** @var EventDispatch */ + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; + + /** * @param \Swoole\Server $server - * @throws \Snowflake\Exception\ConfigException + * @throws ConfigException */ public function onStart(\Swoole\Server $server) { @@ -49,7 +58,7 @@ class ServerDefaultEvent extends Server if (!is_object($message) || !($message instanceof PipeMessage)) { return; } - defer(fn() => fire(Event::SYSTEM_RESOURCE_RELEASES)); + defer(fn() => $this->eventDispatch->dispatch(new OnAfterRequest())); $this->runEvent(Constant::PIPE_MESSAGE, function (\Swoole\Server $server, $src_worker_id, $message) { call_user_func([$message, 'execute']); diff --git a/Server/TCPServerListener.php b/Server/TCPServerListener.php index fd74da7c..8323792f 100644 --- a/Server/TCPServerListener.php +++ b/Server/TCPServerListener.php @@ -2,9 +2,11 @@ namespace Server; +use Annotation\Inject; use Exception; -use ReflectionException; +use Server\Events\OnAfterRequest; use Snowflake\Event; +use Snowflake\Events\EventDispatch; use Swoole\Server; use Swoole\Server\Port; @@ -21,6 +23,11 @@ class TCPServerListener extends Abstracts\Server protected static bool|Port $_tcp; + /** @var EventDispatch */ + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; + + /** * UDPServerListener constructor. * @param Server|Port $server @@ -51,7 +58,7 @@ class TCPServerListener extends Abstracts\Server { $this->runEvent(Constant::CONNECT, null, [$server, $fd]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } @@ -66,7 +73,7 @@ class TCPServerListener extends Abstracts\Server { $this->runEvent(Constant::RECEIVE, null, [$server, $fd, $reactor_id, $data]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } @@ -79,7 +86,8 @@ class TCPServerListener extends Abstracts\Server { $this->runEvent(Constant::CLOSE, null, [$server, $fd]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + + $this->eventDispatch->dispatch(new OnAfterRequest()); } } diff --git a/Server/UDPServerListener.php b/Server/UDPServerListener.php index 27b360df..7286509b 100644 --- a/Server/UDPServerListener.php +++ b/Server/UDPServerListener.php @@ -2,9 +2,12 @@ namespace Server; +use Annotation\Inject; use Exception; use ReflectionException; +use Server\Events\OnAfterRequest; use Snowflake\Event; +use Snowflake\Events\EventDispatch; use Snowflake\Exception\NotFindClassException; use Swoole\Server; use Swoole\Server\Port; @@ -17,7 +20,10 @@ use Swoole\Server\Port; class UDPServerListener extends Abstracts\Server { - protected static bool|Port $_udp; + + /** @var EventDispatch */ + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; use ListenerHelper; @@ -50,7 +56,7 @@ class UDPServerListener extends Abstracts\Server { $this->runEvent(Constant::MESSAGE, null, [$server, $data, $clientInfo]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } } diff --git a/Server/WebSocketServerListener.php b/Server/WebSocketServerListener.php index ec54126d..80ed3fb8 100644 --- a/Server/WebSocketServerListener.php +++ b/Server/WebSocketServerListener.php @@ -2,9 +2,11 @@ namespace Server; +use Annotation\Inject; use Exception; use ReflectionException; -use Snowflake\Event; +use Server\Events\OnAfterRequest; +use Snowflake\Events\EventDispatch; use Snowflake\Exception\NotFindClassException; use Snowflake\Snowflake; use Swoole\Http\Request; @@ -25,6 +27,11 @@ class WebSocketServerListener extends Abstracts\Server use ListenerHelper; + + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; + + /** * @param mixed $server * @param array|null $settings @@ -88,7 +95,7 @@ class WebSocketServerListener extends Abstracts\Server } $this->runEvent(Constant::HANDSHAKE, fn() => $this->disconnect($request, $response), [$request, $response]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } @@ -112,7 +119,7 @@ class WebSocketServerListener extends Abstracts\Server { $this->runEvent(Constant::CONNECT, fn() => $server->confirm($fd), [$server, $fd]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } @@ -125,7 +132,7 @@ class WebSocketServerListener extends Abstracts\Server { $this->runEvent(Constant::MESSAGE, fn() => $server->push($frame->fd, '.'), [$server, $frame]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } @@ -138,7 +145,7 @@ class WebSocketServerListener extends Abstracts\Server { $this->runEvent(Constant::CLOSE, fn() => $server->confirm($fd), [$server, $fd]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } @@ -151,7 +158,7 @@ class WebSocketServerListener extends Abstracts\Server { $this->runEvent(Constant::DISCONNECT, fn() => $server->confirm($fd), [$server, $fd]); - $this->_event->dispatch(Event::SYSTEM_RESOURCE_RELEASES); + $this->eventDispatch->dispatch(new OnAfterRequest()); } } diff --git a/Server/Worker/ServerWorker.php b/Server/Worker/ServerWorker.php index 85cd40d3..6187880d 100644 --- a/Server/Worker/ServerWorker.php +++ b/Server/Worker/ServerWorker.php @@ -3,13 +3,17 @@ namespace Server\Worker; use Annotation\Annotation; +use Annotation\Inject; use Exception; use ReflectionException; -use Server\ApplicationStore; use Server\Constant; +use Server\Events\OnWorkerError; +use Server\Events\OnWorkerExit; +use Server\Events\OnWorkerStart; +use Server\Events\OnWorkerStop; use Snowflake\Abstracts\Config; use Snowflake\Core\Help; -use Snowflake\Event; +use Snowflake\Events\EventDispatch; use Snowflake\Exception\ConfigException; use Snowflake\Exception\NotFindClassException; use Snowflake\Runtime; @@ -26,6 +30,13 @@ class ServerWorker extends \Server\Abstracts\Server { + /** + * @var EventDispatch + */ + #[Inject(EventDispatch::class)] + public EventDispatch $eventDispatch; + + /** * @param Server $server * @param int $workerId @@ -34,18 +45,15 @@ class ServerWorker extends \Server\Abstracts\Server public function onWorkerStart(Server $server, int $workerId) { $this->_setConfigCache($workerId); - - $store = ApplicationStore::getStore()->instance(); - $annotation = Snowflake::app()->getAnnotation(); $annotation->read(APP_PATH . 'app'); + $this->eventDispatch->dispatch(new OnWorkerStart($server, $workerId)); + $this->runEvent(Constant::WORKER_START, null, [$server, $workerId]); $this->workerInitExecutor($server, $annotation, $workerId); $this->interpretDirectory($annotation); - - $store->close(); } @@ -128,9 +136,7 @@ class ServerWorker extends \Server\Abstracts\Server { $this->runEvent(Constant::WORKER_STOP, null, [$server, $workerId]); - Event::trigger(Event::SERVER_WORKER_STOP); - - fire(Event::SYSTEM_RESOURCE_CLEAN); + $this->eventDispatch->dispatch(new OnWorkerStop($server, $workerId)); Timer::clearAll(); } @@ -147,7 +153,7 @@ class ServerWorker extends \Server\Abstracts\Server putenv('state=exit'); - Event::trigger(Event::SERVER_WORKER_EXIT, [$server, $workerId]); + $this->eventDispatch->dispatch(new OnWorkerExit($server, $workerId)); Snowflake::getApp('logger')->insert(); } @@ -165,7 +171,7 @@ class ServerWorker extends \Server\Abstracts\Server { $this->runEvent(Constant::WORKER_ERROR, null, [$server, $worker_id, $worker_pid, $exit_code, $signal]); - Event::trigger(Event::SERVER_WORKER_ERROR); + $this->eventDispatch->dispatch(new OnWorkerError($server, $worker_id, $worker_pid, $exit_code, $signal)); $message = sprintf('Worker#%d::%d error stop. signal %d, exit_code %d, msg %s', $worker_id, $worker_pid, $signal, $exit_code, swoole_strerror(swoole_last_error(), 9) diff --git a/System/Abstracts/BaseApplication.php b/System/Abstracts/BaseApplication.php index e34e6226..082e0fb1 100644 --- a/System/Abstracts/BaseApplication.php +++ b/System/Abstracts/BaseApplication.php @@ -29,7 +29,6 @@ use Server\ServerManager; use Snowflake\Aop; use Snowflake\Async; use Snowflake\Cache\Redis; -use Snowflake\Channel; use Snowflake\Di\Service; use Snowflake\Error\ErrorHandler; use Snowflake\Error\Logger; @@ -423,16 +422,6 @@ abstract class BaseApplication extends Service } - /** - * @return Channel - * @throws Exception - */ - public function getChannel(): Channel - { - return $this->get('channel'); - } - - /** * @return Pool * @throws Exception @@ -470,7 +459,6 @@ abstract class BaseApplication extends Service 'goto' => ['class' => BaseGoto::class], 'response' => ['class' => Response::class], 'request' => ['class' => Request::class], - 'channel' => ['class' => Channel::class], 'rpc' => ['class' => \Rpc\Producer::class], 'rpc-service' => ['class' => \Rpc\Service::class], 'http2' => ['class' => Http2::class], diff --git a/System/Abstracts/TraitApplication.php b/System/Abstracts/TraitApplication.php index 64ce0301..688cbf96 100644 --- a/System/Abstracts/TraitApplication.php +++ b/System/Abstracts/TraitApplication.php @@ -19,7 +19,6 @@ use Kafka\Producer; use Rpc\Producer as RPCProducer; use Snowflake\Async; use Snowflake\Cache\Redis; -use Snowflake\Channel; use Snowflake\Error\Logger; use Snowflake\Event; use Snowflake\Jwt\Jwt; @@ -50,7 +49,6 @@ use Snowflake\Pool\Pool; * @property \Snowflake\Crontab\Producer $crontab * @property HttpFilter $filter * @property RPCProducer $rpc - * @property Channel $channel * @property Shutdown $shutdown * @property Pool $clientsPool */ diff --git a/System/Application.php b/System/Application.php index 1ad0e99f..9b4bd918 100644 --- a/System/Application.php +++ b/System/Application.php @@ -95,7 +95,7 @@ class Application extends BaseApplication if (!class_exists($service)) { throw new NotFindClassException($service); } - $class = Snowflake::createObject($service); + $class = Snowflake::getDi()->get($service); if (method_exists($class, 'onImport')) { $class->onImport($this); } diff --git a/System/Cache/Redis.php b/System/Cache/Redis.php index 4e3d7d9f..c5a6cd0f 100644 --- a/System/Cache/Redis.php +++ b/System/Cache/Redis.php @@ -9,11 +9,14 @@ declare(strict_types=1); namespace Snowflake\Cache; +use Annotation\Inject; use Exception; +use Server\Events\OnWorkerExit; +use Server\Events\OnWorkerStop; use Snowflake\Abstracts\Component; use Snowflake\Abstracts\Config; use Snowflake\Core\Json; -use Snowflake\Event; +use Snowflake\Events\EventProvider; use Snowflake\Exception\ConfigException; use Snowflake\Snowflake; @@ -21,35 +24,32 @@ use Snowflake\Snowflake; * Class Redis * @package Snowflake\Snowflake\Cache * @see \Redis - * + * @mixin \Redis */ class Redis extends Component { - /** - * @throws Exception + * @var EventProvider */ - public function init() - { - Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'destroy']); - Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'release']); - Event::on(Event::SERVER_WORKER_START, [$this, 'createPool']); - Event::on(Event::SERVER_TASK_START, [$this, 'createPool']); - } + #[Inject(EventProvider::class)] + public EventProvider $eventProvider; /** * @throws ConfigException * @throws Exception */ - public function createPool() + public function init() { $connections = Snowflake::app()->getRedisFromPool(); $config = $this->get_config(); - $length = Config::get('connections.pool.max',10); + $length = Config::get('connections.pool.max', 10); + + $this->eventProvider->on(OnWorkerStop::class, [$this, 'destroy'], 0); + $this->eventProvider->on(OnWorkerExit::class, [$this, 'destroy'], 0); $connections->initConnections('Redis:' . $config['host'], true, $length); } @@ -68,12 +68,11 @@ class Redis extends Component $data = $this->{$name}(...$arguments); } else { $data = $this->proxy()->{$name}(...$arguments); + $this->release(); } - if (microtime(true) - $time >= 0.02) { $this->warning('Redis:' . Json::encode([$name, $arguments]) . (microtime(true) - $time)); } - return $data; } @@ -94,7 +93,7 @@ if (_nx ~= 0) then end return 0 SCRIPT; - return $this->proxy()->eval($script, ['{lock}:' . $key, $timeout], 1); + return $this->eval($script, ['{lock}:' . $key, $timeout], 1); } @@ -105,8 +104,7 @@ SCRIPT; */ public function unlock($key): int { - $redis = $this->proxy(); - return $redis->del('{lock}:' . $key); + return $this->del('{lock}:' . $key); } diff --git a/System/Channel.php b/System/Channel.php deleted file mode 100644 index 1c601b93..00000000 --- a/System/Channel.php +++ /dev/null @@ -1,108 +0,0 @@ - $value) { - if (empty($value)) { - continue; - } - $channel = $this->channelInit($key); - if ($channel->count() >= 100) { - continue; - } - foreach ($value as $item) { - $channel->enqueue($item); - } - } - Channel::$_waitRecover = []; - } - - - /** - * @param mixed $value - * @param string $name - * @throws Exception - */ - public function push(mixed $value, string $name = ''): void - { - if (!isset(Channel::$_waitRecover[$name])) { - Channel::$_waitRecover[$name] = []; - } - Channel::$_waitRecover[$name][] = $value; - } - - - /** - * @param string $name - * @return bool|SplQueue - */ - private function channelInit(string $name = ''): bool|SplQueue - { - if (!isset(static::$_channels[$name]) || !(static::$_channels[$name] instanceof SplQueue)) { - static::$_channels[$name] = new SplQueue(); - } - return static::$_channels[$name]; - } - - - /** - * - * 清空缓存 - */ - public function cleanAll() - { - static::$_channels = null; - static::$_channels = []; - } - - - /** - * @param string $name - * @param Closure $closure - * @return mixed - */ - public function pop(string $name, Closure $closure): mixed - { - $channel = $this->channelInit($name); - if ($channel->isEmpty()) { - return call_user_func($closure); - } - return $channel->dequeue(); - } - - -} diff --git a/System/Error/Logger.php b/System/Error/Logger.php index 1da089f8..3767ba33 100644 --- a/System/Error/Logger.php +++ b/System/Error/Logger.php @@ -9,11 +9,14 @@ declare(strict_types=1); namespace Snowflake\Error; +use Annotation\Inject; use Exception; +use Server\Events\OnAfterRequest; use Snowflake\Abstracts\Component; use Snowflake\Abstracts\Config; use Snowflake\Core\Json; use Snowflake\Event; +use Snowflake\Events\EventProvider; use Snowflake\Exception\ConfigException; use Snowflake\Snowflake; use Swoole\Coroutine; @@ -29,15 +32,20 @@ class Logger extends Component private array $logs = []; + /** @var EventProvider */ + #[Inject(EventProvider::class)] + public EventProvider $eventProvider; + + private array $sources = []; + /** + * + */ public function init() { - Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'insert']); - Event::on(Event::SYSTEM_RESOURCE_CLEAN, [$this, 'closeSource']); - Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'closeSource']); - Event::on(Event::SYSTEM_RESOURCE_RELEASES, [$this, 'insert']); + $this->eventProvider->on(OnAfterRequest::class, [$this, 'insert']); } diff --git a/System/Event.php b/System/Event.php index 80b57448..cd8ff5a4 100644 --- a/System/Event.php +++ b/System/Event.php @@ -7,6 +7,7 @@ namespace Snowflake; use Exception; use Snowflake\Abstracts\BaseObject; +use Swoole\Coroutine; /** * Class Event @@ -85,18 +86,16 @@ class Event extends BaseObject * @param bool $isAppend * @throws Exception */ - public static function on($name, $callback, array $parameter = [], bool $isAppend = false) + public static function on($name, $callback, bool $isAppend = false) { if (!isset(static::$_events[$name])) { static::$_events[$name] = []; } - if ($callback instanceof \Closure) { - $callback = \Closure::bind($callback, Snowflake::app()); - } else if (is_array($callback) && is_string($callback[0])) { + if (is_array($callback) && is_string($callback[0])) { if (!class_exists($callback[0])) { throw new Exception('Undefined callback class.'); } - $callback[0] = Snowflake::createObject($callback[0]); + $callback[0] = di($callback[0]); } if (static::exists($name, $callback)) { return; @@ -130,15 +129,10 @@ class Event extends BaseObject /** * @param $name - * @return bool */ - public static function offName($name): bool + public static function offName($name): void { - if (!static::exists($name)) { - return true; - } unset(static::$_events[$name]); - return static::exists($name); } @@ -147,14 +141,11 @@ class Event extends BaseObject * @param null $callback * @return bool */ - public static function exists($name, $callback = null): bool + public static function exists($name, $callback): bool { - if (!isset(static::$_events[$name])) { + if ($callback instanceof \Closure || !isset(static::$_events[$name])) { return false; } - if ($callback === null) { - return true; - } foreach (static::$_events[$name] as $event) { [$handler, $parameter] = $event; if ($handler === $callback) { @@ -175,7 +166,6 @@ class Event extends BaseObject if (!static::exists($name, $handler)) { return null; } - if (empty($handler)) { return static::$_events[$name]; } @@ -216,11 +206,14 @@ class Event extends BaseObject */ public static function trigger($name, $parameter = null, bool $is_remove = false): bool { - foreach ((static::$_events[$name] ?? []) as $event) { + foreach ((static::$_events[$name] ?? []) as $key => $event) { static::execute($event, $parameter); + if ($event instanceof \Closure) { + unset(static::$_events[$name][$key]); + } } if ($is_remove) { - static::offName($name); + unset(static::$_events[$name]); } return true; } @@ -229,45 +222,19 @@ class Event extends BaseObject /** * @param $event * @param $parameter - * @return bool + * @return void * @throws Exception */ - private static function execute($event, $parameter): bool + private static function execute($event, $parameter): void { try { - $meta = static::mergeParams($event[1], $parameter); - if (call_user_func($event[0], ...$meta) === false) { - return false; - } - return true; + call_user_func($event[0], ...$parameter); } catch (\Throwable $throwable) { - return logger()->addError($throwable, 'throwable'); + logger()->addError($throwable, 'throwable'); + return; } } - /** - * @param $defaultParameter - * @param array $parameter - * @return array - */ - private static function mergeParams($defaultParameter, array $parameter = []): array - { - if (empty($defaultParameter)) { - $defaultParameter = $parameter; - } else { - if (!is_array($parameter)) { - $parameter = []; - } - foreach ($parameter as $key => $value) { - $defaultParameter[] = $value; - } - } - if (!is_array($defaultParameter)) { - $defaultParameter = [$defaultParameter]; - } - return $defaultParameter; - } - } diff --git a/System/Events/EventDispatch.php b/System/Events/EventDispatch.php index e3531c2c..8716b799 100644 --- a/System/Events/EventDispatch.php +++ b/System/Events/EventDispatch.php @@ -2,31 +2,36 @@ namespace Snowflake\Events; -class EventDispatch +use Annotation\Inject; +use Snowflake\Abstracts\BaseObject; + + +/** + * + */ +class EventDispatch extends BaseObject { - private EventListener $eventListener; + #[Inject(EventProvider::class)] + public EventProvider $eventProvider; /** - * @param $event - * @param array $params + * @param object $triggerEvent + * @return object */ - public function emit($event, array $params = []) + public function dispatch(object $triggerEvent): object { - $events = $this->eventListener->getEventListeners($event); - if (empty($events)) { - return; - } - while ($events->valid()) { - /** @var EventDispatchInterface $interface */ - $interface = $events->current(); - $interface->onHandler(); - if ($interface->stopPagination()) { + $lists = $this->eventProvider->getListenersForEvent($triggerEvent); + foreach ($lists as $listener) { + /** @var Struct $list */ + $listener($triggerEvent); + if ($triggerEvent instanceof StoppableEventInterface && $triggerEvent->isPropagationStopped()) { break; } - $events->next(); } + return $triggerEvent; } + } diff --git a/System/Events/EventDispatchInterface.php b/System/Events/EventDispatchInterface.php deleted file mode 100644 index 42fb8986..00000000 --- a/System/Events/EventDispatchInterface.php +++ /dev/null @@ -1,18 +0,0 @@ -_events[$event])) { - $this->_events[$event] = new SplPriorityQueue(); - } - $this->_events[$event]->insert($handler, $handler->getZOrder()); - } - - - /** - * @param $event - * @return SplPriorityQueue|null - */ - public function getEventListeners($event): ?SplPriorityQueue - { - return $this->_events[$event] ?? null; - } - - -} diff --git a/System/Events/EventProvider.php b/System/Events/EventProvider.php new file mode 100644 index 00000000..d3682f0e --- /dev/null +++ b/System/Events/EventProvider.php @@ -0,0 +1,45 @@ +_listeners as $listener) { + if (!($event instanceof $listener->event)) { + continue; + } + $queue->insert($listener, $listener->priority); + } + return $queue; + } + + + /** + * @param string $event + * @param callable $handler + * @param int $zOrder + */ + public function on(string $event, callable $handler, int $zOrder = 1) + { + $this->_listeners[$event][] = new Struct($event, $handler, $zOrder); + } + + +} diff --git a/System/Events/EventProviders.php b/System/Events/EventProviders.php index 88441e1e..7b9e741e 100644 --- a/System/Events/EventProviders.php +++ b/System/Events/EventProviders.php @@ -2,7 +2,21 @@ namespace Snowflake\Events; -class EventProviders + +/** + * + */ +interface EventProviders { + + /** + * @param object $event + * An event for which to return the relevant listeners. + * @return iterable + * An iterable (array, iterator, or generator) of callables. Each + * callable MUST be type-compatible with $event. + */ + public function getListenersForEvent(object $event): iterable; + } diff --git a/System/Events/StoppableEventInterface.php b/System/Events/StoppableEventInterface.php new file mode 100644 index 00000000..c5c37eea --- /dev/null +++ b/System/Events/StoppableEventInterface.php @@ -0,0 +1,24 @@ +event = $event; + $this->listener = $listener; + $this->priority = $priority; + } +} diff --git a/System/Pool/Connection.php b/System/Pool/Connection.php index 0e0ec1a9..0facbc8d 100644 --- a/System/Pool/Connection.php +++ b/System/Pool/Connection.php @@ -182,7 +182,7 @@ class Connection extends Component return; } if ($client->inTransaction()) { - $client->commit(); + return; } $this->getPool()->push($coroutineName, $client); Context::remove($coroutineName); diff --git a/function.php b/function.php index 6e152f22..32eabfce 100644 --- a/function.php +++ b/function.php @@ -538,14 +538,12 @@ if (!function_exists('event')) { /** * @param $name * @param $callback - * @param array $params * @param bool $isAppend * @throws Exception - * @throws Exception */ - function event($name, $callback, array $params = [], bool $isAppend = true) + function event($name, $callback, bool $isAppend = true) { - Event::on($name, $callback, $params, $isAppend); + Event::on($name, $callback, $isAppend); } }