10, 'min' => 1]; private int $storey = 0; protected int $timerId = -1; public bool $enableCache = false; public string $cacheDriver = 'redis'; public array $attributes = []; public array $slave = []; protected ?\Closure $_println = null; /** * @var StdoutLogger */ #[Container(LoggerInterface::class)] public StdoutLogger $logger; /** * @var EventProvider */ #[Container(EventProvider::class)] public EventProvider $eventProvider; /** * @param Pool $connections */ public function __construct(public Pool $connections) { parent::__construct(); $this->_println = \config('databases.logger', null); } /** * @param float $startTime * @param float $endTime * @param string $sql * @param array $params * @return void */ public function println(float $startTime, float $endTime, string $sql, array $params = []): void { if (is_callable($this->_println)) { call_user_func($this->_println, $startTime, $endTime, $sql, $params); } } /** * @return void * @throws */ public function init(): void { $this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); $this->eventProvider->on(Rollback::class, [$this, 'rollback'], 0); $this->eventProvider->on(Commit::class, [$this, 'commit'], 0); $this->eventProvider->on(OnAfterRequest::class, [$this, 'clear']); $this->eventProvider->on(OnWorkerExit::class, [$this, 'disconnect']); $this->eventProvider->on(OnWorkerStart::class, [$this, 'tick']); $this->eventProvider->on(OnTaskerStart::class, [$this, 'tick']); } /** * @return void */ public function tick(): void { $this->timerId = Timer::tick($this->tick_time, fn () => $this->checkClientHealth($this->pool())); } /** * @param Pool $pool * @return void * @throws */ protected function checkClientHealth(Pool $pool): void { $pool->flush($this->getName(), $this->pool['min'] ?? 1); // $length = $pool->size($this->getName()); // for ($i = 0; $i < $length; $i++) { // try { // if (($client = $this->validator($pool)) === false) { // break; // } // $pool->push($this->getName(), $client); // } catch (\Throwable $exception) { // if (!str_contains($exception->getMessage(), 'Client timeout.')) { // $this->logger->error(throwable($exception), [$this->cds]); // } // } // } } /** * @return string */ public function getName(): string { return strtolower($this->driver) . '.' . $this->cds; } /** * @param Pool $pool * @return PDO|bool * @throws */ protected function validator(Pool $pool): PDO|bool { /** @var $client PDO */ if (($client = $pool->get($this->getName())) === false) { return false; } if ($client->query('select 1') === false) { throw new Exception($client->errorInfo()[1]); } return $client; } /** * @return PDO * @throws */ public function getConnection(): PDO { if (!$this->inTransaction()) { return $this->getNormalClientHealth(); } else { return $this->getTransactionClient(); } } /** * @return PDO * @throws */ protected function getNormalClientHealth(): PDO { $data = $this->pool()->get($this->getName(), $this->waite_time); if ($data === false) { throw new Exception('Client Waite timeout.'); } return $data; } /** * @return $this * @throws */ public function beginTransaction(): static { if ($this->storey < 0) { $this->storey = 0; } $this->storey++; return $this; } /** * @return PDO * @throws */ public function getTransactionClient(): PDO { $pdo = Context::get($this->cds); if ($pdo === null) { /** @var PDO $pdo */ $pdo = Context::set($this->cds, $this->getNormalClientHealth()); } if ($this->storey > 0 && !$pdo->inTransaction()) { $pdo->beginTransaction(); } return $pdo; } /** * @return bool * @throws */ public function inTransaction(): bool { return $this->storey > 0; } /** * @throws * 事务回滚 */ public function rollback(): void { $this->storey--; if ($this->storey == 0) { if (!Context::exists($this->cds)) { return; } $pdo = $this->getTransactionClient(); if ($pdo->inTransaction()) { $pdo->rollback(); } $this->release($pdo); Context::remove($this->cds); } } /** * @throws * 事务提交 */ public function commit(): void { $this->storey--; if ($this->storey == 0) { if (!Context::exists($this->cds)) { return; } $pdo = $this->getTransactionClient(); if ($pdo->inTransaction()) { $pdo->commit(); } $this->release($pdo); Context::remove($this->cds); } } /** * @return void * @throws */ public function clear(): void { /** @var PDO $pdo */ $pdo = Context::get($this->cds); if ($pdo === null) { return; } if ($this->inTransaction()) { $pdo->rollback(); } $this->release($pdo); Context::remove($this->cds); $this->storey = 0; } /** * @param string $sql * @param array $attributes * @return Command * @throws */ public function createCommand(string $sql, array $attributes = []): Command { return new Command(['connection' => $this, 'sql' => $sql])->bindValues($attributes); } /** * 回收链接 * @throws */ public function release(PDO $pdo): void { if (!$this->inTransaction()) { $this->pool()->push($this->getName(), $pdo); } } /** * * 回收链接 * @throws */ public function clear_connection(): void { $this->pool()->flush($this->getName(), 0); } /** * @throws */ public function disconnect(): void { if ($this->timerId > -1) { Timer::clear($this->timerId); } $this->pool()->close($this->getName()); } /** * @return PDO */ public function newConnect(): \PDO { $driver = strtolower($this->driver); $options = [ \PDO::ATTR_CASE => \PDO::CASE_NATURAL, \PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION, \PDO::ATTR_ORACLE_NULLS => \PDO::NULL_NATURAL, \PDO::ATTR_STRINGIFY_FETCHES => false, \PDO::ATTR_EMULATE_PREPARES => true, \PDO::ATTR_TIMEOUT => $this->timeout, ]; // MySQL 特定的选项 if ($driver === 'mysql') { $options[\PDO::MYSQL_ATTR_INIT_COMMAND] = 'SET NAMES ' . $this->charset; } $pdo = new PDO($this->database, $this->cds, $this->username, $this->password, $options, $driver); foreach ($this->attributes as $key => $attribute) { $pdo->setAttribute($key, $attribute); } return $pdo; } /** * @return Pool */ protected function pool(): Pool { if (!$this->connections->hasChannel($this->getName())) { $this->connections->created($this->getName(), $this->pool['max'] ?? 1, [$this, 'newConnect']); } return $this->connections; } }