This commit is contained in:
xl
2023-05-26 09:20:29 +08:00
parent 49742a08ad
commit c8159194bf
3 changed files with 284 additions and 333 deletions
+267 -260
View File
@@ -31,268 +31,275 @@ use ReflectionException;
*/ */
class Connection extends Component class Connection extends Component
{ {
public string $id = 'db';
public string $cds = '';
public string $password = '';
public string $username = '';
public string $charset = 'utf-8';
public string $tablePrefix = '';
public string $database = '';
public int $connect_timeout = 30;
public int $read_timeout = 10;
public array $pool = ['max' => 10, 'min' => 1];
private int $storey = 0;
/**
* @var bool
* enable database cache
*/
public bool $enableCache = false;
private ?PDO $_pdo = null;
/**
* @var string
*/
public string $cacheDriver = 'redis';
/**
* @var array
*/
public array $slaveConfig = [];
public array $attributes = [];
private ?Schema $_schema = null;
public string $id = 'db';
/** public string $cds = '';
* @return void public string $password = '';
* @throws Exception public string $username = '';
*/ public string $charset = 'utf-8';
public function init(): void
{
$eventProvider = Kiri::getDi()->get(EventProvider::class); public string $tablePrefix = '';
$eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0);
$eventProvider->on(Rollback::class, [$this, 'rollback'], 0); public string $database = '';
$eventProvider->on(Commit::class, [$this, 'commit'], 0);
public int $connect_timeout = 30;
$this->initConnections();
} public int $read_timeout = 10;
public array $pool = ['max' => 10, 'min' => 1];
/** private int $storey = 0;
* @return void
* @throws ReflectionException /**
*/ * @var bool
public function initConnections(): void * enable database cache
{ */
$connections = Kiri::getDi()->get(Pool::class); public bool $enableCache = false;
$connections->initConnections($this->cds, $this->pool['max'] ?? 1, $this->gender([
'cds' => $this->cds,
'username' => $this->username, private ?PDO $_pdo = null;
'password' => $this->password,
'attributes' => $this->attributes,
'connect_timeout' => $this->connect_timeout, private Pool $connections;
'read_timeout' => $this->read_timeout,
'dbname' => $this->database,
'pool' => $this->pool /**
])); * @var string
} */
public string $cacheDriver = 'redis';
/** /**
* @param array $config * @var array
* @return \Closure */
*/ public array $slaveConfig = [];
public function gender(array $config): \Closure public array $attributes = [];
{
return static function () use ($config) {
$options = [ private ?Schema $_schema = null;
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL, /**
PDO::ATTR_STRINGIFY_FETCHES => false, * @return void
PDO::ATTR_EMULATE_PREPARES => false, * @throws Exception
PDO::ATTR_TIMEOUT => $config['connect_timeout'], */
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4') public function init(): void
]; {
if (!Context::inCoroutine()) { $eventProvider = Kiri::getDi()->get(EventProvider::class);
$options[PDO::ATTR_PERSISTENT] = true; $eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0);
} $eventProvider->on(Rollback::class, [$this, 'rollback'], 0);
$link = new PDO('mysql:dbname=' . $config['dbname'] . ';host=' . $config['cds'], $eventProvider->on(Commit::class, [$this, 'commit'], 0);
$config['username'], $config['password'], $options);
foreach ($config['attributes'] as $key => $attribute) { $this->connections = Kiri::getDi()->get(Pool::class);
$link->setAttribute($key, $attribute); }
}
return $link;
}; /**
} * @param array $config
* @return \Closure
*/
/** public function gender(array $config): \Closure
* @return mixed {
* @throws ReflectionException return static function () use ($config) {
* @throws NotFindClassException $options = [
* @throws Exception PDO::ATTR_CASE => PDO::CASE_NATURAL,
*/ PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
public function getSchema(): Schema PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
{ PDO::ATTR_STRINGIFY_FETCHES => false,
if ($this->_schema === null) { PDO::ATTR_EMULATE_PREPARES => false,
$this->_schema = Kiri::createObject([ PDO::ATTR_TIMEOUT => $config['connect_timeout'],
'class' => Schema::class, PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . ($config['charset'] ?? 'utf8mb4')
'db' => $this ];
]); if (!Context::inCoroutine()) {
} $options[PDO::ATTR_PERSISTENT] = true;
return $this->_schema; }
} $link = new PDO('mysql:dbname=' . $config['dbname'] . ';host=' . $config['cds'],
$config['username'], $config['password'], $options);
foreach ($config['attributes'] as $key => $attribute) {
/** $link->setAttribute($key, $attribute);
* @return PDO }
* @throws Exception return $link;
*/ };
public function getConnection(): PDO }
{
$connections = Kiri::getDi()->get(Pool::class);
return $connections->get($this->cds); /**
} * @return mixed
* @throws ReflectionException
* @throws NotFindClassException
/** * @throws Exception
* @return $this */
* @throws Exception public function getSchema(): Schema
*/ {
public function beginTransaction(): static if ($this->_schema === null) {
{ $this->_schema = Kiri::createObject([
$pdo = $this->getTransactionClient(); 'class' => Schema::class,
if ($this->storey == 0) { 'db' => $this
$pdo->beginTransaction(); ]);
} }
$this->storey++; return $this->_schema;
return $this; }
}
/**
/** * @return PDO
* @return PDO * @throws Exception
* @throws Exception */
*/ public function getConnection(): PDO
public function getTransactionClient(): PDO {
{ return $this->pool()->get($this->cds);
if (!Db::inTransactionsActive()) { }
return $this->getConnection();
}
$pdo = Context::get($this->cds); /**
if ($pdo === null) { * @return $this
/** @var PDO $pdo */ * @throws Exception
$pdo = Context::set($this->cds, $this->getConnection()); */
} public function beginTransaction(): static
return $pdo; {
} if ($this->storey == 0) {
/** @var PDO $pdo */
/** $pdo = Context::get($this->cds);
* @return $this|bool if ($pdo !== null && !$pdo->inTransaction()) {
* @throws Exception $pdo->beginTransaction();
*/ }
public function inTransaction(): bool|static }
{ $this->storey++;
$pdo = $this->getTransactionClient(); return $this;
return $pdo->inTransaction(); }
}
/** /**
* @throws Exception * @return PDO
* 事务回滚 * @throws Exception
*/ */
public function rollback(): void public function getTransactionClient(): PDO
{ {
$this->storey--; $pdo = Context::get($this->cds);
if ($this->storey == 0) { if ($pdo === null) {
$pdo = $this->getTransactionClient(); $pdo = $this->getConnection();
if ($pdo->inTransaction()) { if ($this->storey > 0 && !$pdo->inTransaction()) {
$pdo->rollback(); $pdo->beginTransaction();
} }
$connections = Kiri::getDi()->get(Pool::class); Context::set($this->cds, $pdo);
$connections->push($this->cds, $pdo); }
Context::remove($this->cds); return $pdo;
} }
}
/**
/** * @return bool
* @throws Exception * @throws Exception
* 事务提交 */
*/ public function inTransaction(): bool
public function commit(): void {
{ return $this->storey > 0;
$this->storey--; }
if ($this->storey == 0) {
$pdo = $this->getTransactionClient(); /**
if ($pdo->inTransaction()) { * @throws Exception
$pdo->commit(); * 事务回滚
} */
$connections = Kiri::getDi()->get(Pool::class); public function rollback(): void
$connections->push($this->cds, $pdo); {
Context::remove($this->cds); $this->storey--;
} if ($this->storey == 0) {
} $pdo = Context::get($this->cds);
if ($pdo === null) {
return;
/** }
* @param null $sql if ($pdo->inTransaction()) {
* @param array $attributes $pdo->rollback();
* @return Command }
* @throws Exception $this->pool()->push($this->cds, $pdo);
*/ Context::remove($this->cds);
public function createCommand($sql = null, array $attributes = []): Command }
{ }
$command = new Command(['connection' => $this, 'sql' => $sql]);
return $command->bindValues($attributes); /**
} * @throws Exception
* 事务提交
*/
/** public function commit(): void
* {
* 回收链接 $this->storey--;
* @throws if ($this->storey == 0) {
*/ $pdo = Context::get($this->cds);
public function release(PDO $PDO): void if ($pdo === null) {
{ return;
$connections = Kiri::getDi()->get(Pool::class); }
$connections->push($this->cds, $PDO); if ($pdo->inTransaction()) {
} $pdo->commit();
}
$this->pool()->push($this->cds, $pdo);
/** Context::remove($this->cds);
* }
* 回收链接 }
* @throws
*/
public function clear_connection(): void /**
{ * @param null $sql
$connections = Kiri::getDi()->get(Pool::class); * @param array $attributes
$connections->clean($this->cds); * @return Command
} * @throws Exception
*/
public function createCommand($sql = null, array $attributes = []): Command
/** {
* @throws Exception $command = new Command(['connection' => $this, 'sql' => $sql]);
*/ return $command->bindValues($attributes);
public function disconnect(): void }
{
$connections = Kiri::getDi()->get(Pool::class);
$connections->clean($this->cds); /**
} *
* 回收链接
* @throws
*/
public function release(PDO $PDO): void
{
$this->pool()->push($this->cds, $PDO);
}
/**
*
* 回收链接
* @throws
*/
public function clear_connection(): void
{
$this->pool()->clean($this->cds);
}
/**
* @throws Exception
*/
public function disconnect(): void
{
$this->pool()->clean($this->cds);
}
/**
* @return Pool
*/
private function pool(): Pool
{
if (!$this->connections->hasChannel($this->cds)) {
$params = [
'cds' => $this->cds,
'username' => $this->username,
'password' => $this->password,
'attributes' => $this->attributes,
'connect_timeout' => $this->connect_timeout,
'read_timeout' => $this->read_timeout,
'dbname' => $this->database,
'pool' => $this->pool
];
$itemCount = $this->pool['max'] ?? 1;
$this->connections->created($this->cds, $itemCount, $this->gender($params));
}
return $this->connections;
}
} }
+3 -41
View File
@@ -7,8 +7,6 @@ namespace Database;
use Exception; use Exception;
use Kiri; use Kiri;
use Kiri\Abstracts\Providers; use Kiri\Abstracts\Providers;
use Kiri\Config\ConfigProvider;
use Swoole\Timer;
use Kiri\Di\LocalService; use Kiri\Di\LocalService;
/** /**
@@ -22,55 +20,19 @@ class DatabasesProviders extends Providers
/** /**
* @param LocalService $application * @param LocalService $application
* @return void * @return void
* @throws \ReflectionException * @throws Exception
*/ */
public function onImport(LocalService $application): void public function onImport(LocalService $application): void
{ {
$main = Kiri::getDi()->get(Kiri\Application::class); $main = Kiri::getDi()->get(Kiri\Application::class);
$main->command(BackupCommand::class); $main->command(BackupCommand::class);
$databases = \config('databases.connections', []); $databases = \config('databases.connections', []);
if (empty($databases)) { if (empty($databases)) {
return; return;
} }
foreach ($databases as $key => $database) { foreach ($databases as $key => $database) {
$application->set('db.' . $key, $this->_settings($database)); $application->set('db.' . $key, Kiri::createObject($this->_settings($database)));
}
}
public function start(): void
{
if (!Kiri\Di\Context::inCoroutine()) {
return;
}
Timer::tick(60000, function () {
$databases = \config('databases.connections', []);
if (empty($databases)) {
return;
}
$connection = Kiri::getDi()->get(Kiri\Pool\Pool::class);
foreach ($databases as $database) {
$connection->flush($database['cds'], $database['pool']['min'] ?? 1);
}
});
}
/**
* @return void
* @throws Exception
*/
public function exit(): void
{
Timer::clearAll();
$databases = \config('databases.connections', []);
if (!empty($databases)) {
$connection = Kiri::getDi()->get(Kiri\Pool\Pool::class);
foreach ($databases as $database) {
$connection->clean($database['cds']);
}
} }
} }
+14 -32
View File
@@ -10,16 +10,14 @@ declare(strict_types=1);
namespace Database; namespace Database;
use Closure; use Closure;
use Database\Affair\BeginTransaction;
use Database\Affair\Commit; use Database\Affair\Commit;
use Database\Affair\Rollback; use Database\Affair\Rollback;
use Database\Traits\QueryTrait; use Database\Traits\QueryTrait;
use Exception; use Exception;
use Kiri\Di\Context;
use Kiri\Events\EventDispatch;
use Kiri\Exception\ConfigException; use Kiri\Exception\ConfigException;
use Psr\Container\ContainerExceptionInterface; use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface; use Psr\Container\NotFoundExceptionInterface;
use ReflectionException;
/** /**
* Class Db * Class Db
@@ -32,21 +30,12 @@ class Db implements ISqlBuilder
private static bool $_inTransaction = false; private static bool $_inTransaction = false;
/**
* @return bool
*/
public static function inTransactionsActive(): bool
{
return Context::exists('transactions::status') && Context::get('transactions::status') === true;
}
/** /**
* @return void * @return void
*/ */
public static function beginTransaction(): void public static function beginTransaction(): void
{ {
Context::set('transactions::status', true); fire(new BeginTransaction());
} }
@@ -77,28 +66,21 @@ class Db implements ISqlBuilder
} }
/** /**
* @throws ContainerExceptionInterface * @return void
* @throws NotFoundExceptionInterface */
*/
public static function commit(): void public static function commit(): void
{ {
$event = \Kiri::getDi()->get(EventDispatch::class); fire(new Commit());
$event->dispatch(new Commit());
Context::remove('transactions::status');
} }
/** /**
* @return void * @return void
* @throws ContainerExceptionInterface */
* @throws NotFoundExceptionInterface
*/
public static function rollback(): void public static function rollback(): void
{ {
$event = \Kiri::getDi()->get(EventDispatch::class); fire(new Rollback());
$event->dispatch(new Rollback());
Context::remove('transactions::status');
} }
@@ -173,11 +155,11 @@ class Db implements ISqlBuilder
/** /**
* @param Connection|null $connection * @param Connection|null $connection
* @return mixed * @return array|bool|null
* @throws Exception * @throws Exception
*/ */
public function find(Connection $connection = NULL): mixed public function find(Connection $connection = NULL): array|bool|null
{ {
$connection = static::getDefaultConnection($connection); $connection = static::getDefaultConnection($connection);
return $connection->createCommand(SqlBuilder::builder($this)->all()) return $connection->createCommand(SqlBuilder::builder($this)->all())