getEventProvider(); $provider->on(OnWorkerStop::class, [$this, 'clear_connection'], 0); $provider->on(OnWorkerExit::class, [$this, 'clear_connection'], 0); $provider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); $provider->on(Rollback::class, [$this, 'rollback'], 0); $provider->on(Commit::class, [$this, 'commit'], 0); $this->connectPoolInstance(); } /** * @param $isSearch * @return PDO * @throws Exception */ public function getConnect($isSearch): PDO { return !$isSearch ? $this->masterInstance() : $this->slaveInstance(); } /** * @throws Exception */ public function connectPoolInstance() { $connections = $this->connections(); $pool = Config::get('databases.pool.max', 10); $connections->initConnections('Mysql:' . $this->cds, $pool); if (!empty($this->slaveConfig) && $this->cds != $this->slaveConfig['cds']) { $connections->initConnections('Mysql:' . $this->slaveConfig['cds'], $pool); } } /** * @return mixed * @throws ReflectionException * @throws NotFindClassException * @throws Exception */ public function getSchema(): Schema { if ($this->_schema === null) { $this->_schema = Kiri::createObject([ 'class' => Schema::class, 'db' => $this ]); } return $this->_schema; } /** * @return PDO * @throws Exception */ public function masterInstance(): PDO { return $this->connections()->get([ 'cds' => $this->cds, 'username' => $this->username, 'password' => $this->password, 'attributes' => $this->attributes, 'connect_timeout' => $this->connect_timeout, 'read_timeout' => $this->read_timeout, 'dbname' => $this->database, 'pool' => $this->pool ]); } /** * @return PDO * @throws Exception */ public function slaveInstance(): PDO { if (empty($this->slaveConfig) || $this->slaveConfig['cds'] == $this->cds) { return $this->masterInstance(); } return $this->connections()->get($this->slaveConfig); } /** * @return \Kiri\Pool\Connection * @throws Exception */ private function connections(): \Kiri\Pool\Connection { return Kiri::getDi()->get(\Kiri\Pool\Connection::class); } /** * @return $this * @throws Exception */ public function beginTransaction(): static { $pdo = $this->masterInstance(); $pdo->beginTransaction(); return $this; } /** * @return $this|bool * @throws Exception */ public function inTransaction(): bool|static { $pdo = $this->masterInstance(); return $pdo->inTransaction(); } /** * @throws Exception * 事务回滚 */ public function rollback() { $pdo = $this->masterInstance(); Context::remove($this->cds); if ($pdo->inTransaction()) { $pdo->rollback(); } $this->release($pdo, $this->cds); } /** * @throws Exception * 事务提交 */ public function commit() { $pdo = $this->masterInstance(); Context::remove($this->cds); if ($pdo->inTransaction()) { $pdo->commit(); } $this->release($pdo, $this->cds); } /** * @param null $sql * @param array $attributes * @return Command * @throws Exception */ public function createCommand($sql = null, array $attributes = []): Command { $command = new Command(['db' => $this, 'sql' => $sql]); return $command->bindValues($attributes); } /** * * 回收链接 * @throws */ public function release(PDO $pdo, $isMaster) { $connections = $this->connections(); if ($pdo->inTransaction()) { return; } if (!$isMaster) { if (empty($this->slaveConfig) || !isset($this->slaveConfig['cds'])) { $this->slaveConfig['cds'] = $this->cds; } $connections->addItem($this->slaveConfig['cds'], $pdo); } else { $connections->addItem($this->cds, $pdo); } Context::remove($this->cds); Context::remove($this->slaveConfig['cds']); } /** * * 回收链接 * @throws */ public function clear_connection() { $connections = $this->connections(); $connections->connection_clear($this->cds); if (empty($this->slaveConfig) || !isset($this->slaveConfig['cds'])) { $this->slaveConfig['cds'] = $this->cds; } $connections->connection_clear($this->slaveConfig['cds']); } /** * @throws Exception */ public function disconnect() { $connections = $this->connections(); $connections->disconnect($this->cds); if (empty($this->slaveConfig) || !isset($this->slaveConfig['cds'])) { $this->slaveConfig['cds'] = $this->cds; } $connections->disconnect($this->slaveConfig['cds']); } }