diff --git a/Connection.php b/Connection.php index 7ecbab6..daf1343 100644 --- a/Connection.php +++ b/Connection.php @@ -31,268 +31,275 @@ use ReflectionException; */ class Connection extends Component { - - public string $id = 'db'; - public string $cds = ''; - public string $password = ''; - public string $username = ''; - public string $charset = 'utf-8'; - - public string $tablePrefix = ''; - - public string $database = ''; - - public int $connect_timeout = 30; - - public int $read_timeout = 10; - - public array $pool = ['max' => 10, 'min' => 1]; - - - private int $storey = 0; - - /** - * @var bool - * enable database cache - */ - public bool $enableCache = false; - - - private ?PDO $_pdo = null; - - - /** - * @var string - */ - public string $cacheDriver = 'redis'; - - /** - * @var array - */ - public array $slaveConfig = []; - public array $attributes = []; - - - private ?Schema $_schema = null; - - /** - * @return void - * @throws Exception - */ - public function init(): void - { + public string $id = 'db'; + public string $cds = ''; + public string $password = ''; + public string $username = ''; + public string $charset = 'utf-8'; - $eventProvider = Kiri::getDi()->get(EventProvider::class); - $eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); - $eventProvider->on(Rollback::class, [$this, 'rollback'], 0); - $eventProvider->on(Commit::class, [$this, 'commit'], 0); - - $this->initConnections(); - } + public string $tablePrefix = ''; + + public string $database = ''; + + public int $connect_timeout = 30; + + public int $read_timeout = 10; + + public array $pool = ['max' => 10, 'min' => 1]; - /** - * @return void - * @throws ReflectionException - */ - public function initConnections(): void - { - $connections = Kiri::getDi()->get(Pool::class); - $connections->initConnections($this->cds, $this->pool['max'] ?? 1, $this->gender([ - 'cds' => $this->cds, - 'username' => $this->username, - 'password' => $this->password, - 'attributes' => $this->attributes, - 'connect_timeout' => $this->connect_timeout, - 'read_timeout' => $this->read_timeout, - 'dbname' => $this->database, - 'pool' => $this->pool - ])); - } - - - /** - * @param array $config - * @return \Closure - */ - public function gender(array $config): \Closure - { - return static function () use ($config) { - $options = [ - PDO::ATTR_CASE => PDO::CASE_NATURAL, - PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, - PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL, - PDO::ATTR_STRINGIFY_FETCHES => false, - PDO::ATTR_EMULATE_PREPARES => false, - PDO::ATTR_TIMEOUT => $config['connect_timeout'], - PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4') - ]; - if (!Context::inCoroutine()) { - $options[PDO::ATTR_PERSISTENT] = true; - } - $link = new PDO('mysql:dbname=' . $config['dbname'] . ';host=' . $config['cds'], - $config['username'], $config['password'], $options); - foreach ($config['attributes'] as $key => $attribute) { - $link->setAttribute($key, $attribute); - } - return $link; - }; - } - - - /** - * @return mixed - * @throws ReflectionException - * @throws NotFindClassException - * @throws Exception - */ - public function getSchema(): Schema - { - if ($this->_schema === null) { - $this->_schema = Kiri::createObject([ - 'class' => Schema::class, - 'db' => $this - ]); - } - return $this->_schema; - } - - - /** - * @return PDO - * @throws Exception - */ - public function getConnection(): PDO - { - $connections = Kiri::getDi()->get(Pool::class); - return $connections->get($this->cds); - } - - - /** - * @return $this - * @throws Exception - */ - public function beginTransaction(): static - { - $pdo = $this->getTransactionClient(); - if ($this->storey == 0) { - $pdo->beginTransaction(); - } - $this->storey++; - return $this; - } - - - /** - * @return PDO - * @throws Exception - */ - public function getTransactionClient(): PDO - { - if (!Db::inTransactionsActive()) { - return $this->getConnection(); - } - $pdo = Context::get($this->cds); - if ($pdo === null) { - /** @var PDO $pdo */ - $pdo = Context::set($this->cds, $this->getConnection()); - } - return $pdo; - } - - /** - * @return $this|bool - * @throws Exception - */ - public function inTransaction(): bool|static - { - $pdo = $this->getTransactionClient(); - return $pdo->inTransaction(); - } - - /** - * @throws Exception - * 事务回滚 - */ - public function rollback(): void - { - $this->storey--; - if ($this->storey == 0) { - $pdo = $this->getTransactionClient(); - if ($pdo->inTransaction()) { - $pdo->rollback(); - } - $connections = Kiri::getDi()->get(Pool::class); - $connections->push($this->cds, $pdo); - Context::remove($this->cds); - } - } - - /** - * @throws Exception - * 事务提交 - */ - public function commit(): void - { - $this->storey--; - if ($this->storey == 0) { - $pdo = $this->getTransactionClient(); - if ($pdo->inTransaction()) { - $pdo->commit(); - } - $connections = Kiri::getDi()->get(Pool::class); - $connections->push($this->cds, $pdo); - Context::remove($this->cds); - } - } - - - /** - * @param null $sql - * @param array $attributes - * @return Command - * @throws Exception - */ - public function createCommand($sql = null, array $attributes = []): Command - { - $command = new Command(['connection' => $this, 'sql' => $sql]); - return $command->bindValues($attributes); - } - - - /** - * - * 回收链接 - * @throws - */ - public function release(PDO $PDO): void - { - $connections = Kiri::getDi()->get(Pool::class); - $connections->push($this->cds, $PDO); - } - - - /** - * - * 回收链接 - * @throws - */ - public function clear_connection(): void - { - $connections = Kiri::getDi()->get(Pool::class); - $connections->clean($this->cds); - } - - - /** - * @throws Exception - */ - public function disconnect(): void - { - $connections = Kiri::getDi()->get(Pool::class); - $connections->clean($this->cds); - } - + private int $storey = 0; + + /** + * @var bool + * enable database cache + */ + public bool $enableCache = false; + + + private ?PDO $_pdo = null; + + + private Pool $connections; + + + /** + * @var string + */ + public string $cacheDriver = 'redis'; + + /** + * @var array + */ + public array $slaveConfig = []; + public array $attributes = []; + + + private ?Schema $_schema = null; + + + /** + * @return void + * @throws Exception + */ + public function init(): void + { + $eventProvider = Kiri::getDi()->get(EventProvider::class); + $eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); + $eventProvider->on(Rollback::class, [$this, 'rollback'], 0); + $eventProvider->on(Commit::class, [$this, 'commit'], 0); + + $this->connections = Kiri::getDi()->get(Pool::class); + } + + + /** + * @param array $config + * @return \Closure + */ + public function gender(array $config): \Closure + { + return static function () use ($config) { + $options = [ + PDO::ATTR_CASE => PDO::CASE_NATURAL, + PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, + PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL, + PDO::ATTR_STRINGIFY_FETCHES => false, + PDO::ATTR_EMULATE_PREPARES => false, + PDO::ATTR_TIMEOUT => $config['connect_timeout'], + PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4') + ]; + if (!Context::inCoroutine()) { + $options[PDO::ATTR_PERSISTENT] = true; + } + $link = new PDO('mysql:dbname=' . $config['dbname'] . ';host=' . $config['cds'], + $config['username'], $config['password'], $options); + foreach ($config['attributes'] as $key => $attribute) { + $link->setAttribute($key, $attribute); + } + return $link; + }; + } + + + /** + * @return mixed + * @throws ReflectionException + * @throws NotFindClassException + * @throws Exception + */ + public function getSchema(): Schema + { + if ($this->_schema === null) { + $this->_schema = Kiri::createObject([ + 'class' => Schema::class, + 'db' => $this + ]); + } + return $this->_schema; + } + + + /** + * @return PDO + * @throws Exception + */ + public function getConnection(): PDO + { + return $this->pool()->get($this->cds); + } + + + /** + * @return $this + * @throws Exception + */ + public function beginTransaction(): static + { + if ($this->storey == 0) { + /** @var PDO $pdo */ + $pdo = Context::get($this->cds); + if ($pdo !== null && !$pdo->inTransaction()) { + $pdo->beginTransaction(); + } + } + $this->storey++; + return $this; + } + + + /** + * @return PDO + * @throws Exception + */ + public function getTransactionClient(): PDO + { + $pdo = Context::get($this->cds); + if ($pdo === null) { + $pdo = $this->getConnection(); + if ($this->storey > 0 && !$pdo->inTransaction()) { + $pdo->beginTransaction(); + } + Context::set($this->cds, $pdo); + } + return $pdo; + } + + /** + * @return bool + * @throws Exception + */ + public function inTransaction(): bool + { + return $this->storey > 0; + } + + /** + * @throws Exception + * 事务回滚 + */ + public function rollback(): void + { + $this->storey--; + if ($this->storey == 0) { + $pdo = Context::get($this->cds); + if ($pdo === null) { + return; + } + if ($pdo->inTransaction()) { + $pdo->rollback(); + } + $this->pool()->push($this->cds, $pdo); + Context::remove($this->cds); + } + } + + /** + * @throws Exception + * 事务提交 + */ + public function commit(): void + { + $this->storey--; + if ($this->storey == 0) { + $pdo = Context::get($this->cds); + if ($pdo === null) { + return; + } + if ($pdo->inTransaction()) { + $pdo->commit(); + } + $this->pool()->push($this->cds, $pdo); + Context::remove($this->cds); + } + } + + + /** + * @param null $sql + * @param array $attributes + * @return Command + * @throws Exception + */ + public function createCommand($sql = null, array $attributes = []): Command + { + $command = new Command(['connection' => $this, 'sql' => $sql]); + return $command->bindValues($attributes); + } + + + /** + * + * 回收链接 + * @throws + */ + public function release(PDO $PDO): void + { + $this->pool()->push($this->cds, $PDO); + } + + + /** + * + * 回收链接 + * @throws + */ + public function clear_connection(): void + { + $this->pool()->clean($this->cds); + } + + + /** + * @throws Exception + */ + public function disconnect(): void + { + $this->pool()->clean($this->cds); + } + + + /** + * @return Pool + */ + private function pool(): Pool + { + if (!$this->connections->hasChannel($this->cds)) { + $params = [ + 'cds' => $this->cds, + 'username' => $this->username, + 'password' => $this->password, + 'attributes' => $this->attributes, + 'connect_timeout' => $this->connect_timeout, + 'read_timeout' => $this->read_timeout, + 'dbname' => $this->database, + 'pool' => $this->pool + ]; + $itemCount = $this->pool['max'] ?? 1; + $this->connections->created($this->cds, $itemCount, $this->gender($params)); + } + return $this->connections; + } + } diff --git a/DatabasesProviders.php b/DatabasesProviders.php index 3b40f39..ee64898 100644 --- a/DatabasesProviders.php +++ b/DatabasesProviders.php @@ -7,8 +7,6 @@ namespace Database; use Exception; use Kiri; use Kiri\Abstracts\Providers; -use Kiri\Config\ConfigProvider; -use Swoole\Timer; use Kiri\Di\LocalService; /** @@ -22,55 +20,19 @@ class DatabasesProviders extends Providers /** * @param LocalService $application * @return void - * @throws \ReflectionException + * @throws Exception */ public function onImport(LocalService $application): void { $main = Kiri::getDi()->get(Kiri\Application::class); $main->command(BackupCommand::class); - $databases = \config('databases.connections', []); + $databases = \config('databases.connections', []); if (empty($databases)) { return; } foreach ($databases as $key => $database) { - $application->set('db.' . $key, $this->_settings($database)); - } - } - - - public function start(): void - { - if (!Kiri\Di\Context::inCoroutine()) { - return; - } - Timer::tick(60000, function () { - $databases = \config('databases.connections', []); - if (empty($databases)) { - return; - } - - $connection = Kiri::getDi()->get(Kiri\Pool\Pool::class); - foreach ($databases as $database) { - $connection->flush($database['cds'], $database['pool']['min'] ?? 1); - } - }); - } - - - /** - * @return void - * @throws Exception - */ - public function exit(): void - { - Timer::clearAll(); - $databases = \config('databases.connections', []); - if (!empty($databases)) { - $connection = Kiri::getDi()->get(Kiri\Pool\Pool::class); - foreach ($databases as $database) { - $connection->clean($database['cds']); - } + $application->set('db.' . $key, Kiri::createObject($this->_settings($database))); } } diff --git a/Db.php b/Db.php index dd31252..27c9692 100644 --- a/Db.php +++ b/Db.php @@ -10,16 +10,14 @@ declare(strict_types=1); namespace Database; use Closure; +use Database\Affair\BeginTransaction; use Database\Affair\Commit; use Database\Affair\Rollback; use Database\Traits\QueryTrait; use Exception; -use Kiri\Di\Context; -use Kiri\Events\EventDispatch; use Kiri\Exception\ConfigException; use Psr\Container\ContainerExceptionInterface; use Psr\Container\NotFoundExceptionInterface; -use ReflectionException; /** * Class Db @@ -32,21 +30,12 @@ class Db implements ISqlBuilder private static bool $_inTransaction = false; - /** - * @return bool - */ - public static function inTransactionsActive(): bool - { - return Context::exists('transactions::status') && Context::get('transactions::status') === true; - } - - /** * @return void */ public static function beginTransaction(): void { - Context::set('transactions::status', true); + fire(new BeginTransaction()); } @@ -77,28 +66,21 @@ class Db implements ISqlBuilder } - /** - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ + /** + * @return void + */ public static function commit(): void { - $event = \Kiri::getDi()->get(EventDispatch::class); - $event->dispatch(new Commit()); - Context::remove('transactions::status'); + fire(new Commit()); } - /** - * @return void - * @throws ContainerExceptionInterface - * @throws NotFoundExceptionInterface - */ + /** + * @return void + */ public static function rollback(): void { - $event = \Kiri::getDi()->get(EventDispatch::class); - $event->dispatch(new Rollback()); - Context::remove('transactions::status'); + fire(new Rollback()); } @@ -173,11 +155,11 @@ class Db implements ISqlBuilder /** * @param Connection|null $connection - * @return mixed - * @throws Exception + * @return array|bool|null + * @throws Exception */ - public function find(Connection $connection = NULL): mixed - { + public function find(Connection $connection = NULL): array|bool|null + { $connection = static::getDefaultConnection($connection); return $connection->createCommand(SqlBuilder::builder($this)->all())