eventProvider->on(OnWorkerStop::class, [$this, 'clear_connection'], 0); $this->eventProvider->on(OnWorkerExit::class, [$this, 'clear_connection'], 0); $this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0); $this->eventProvider->on(Rollback::class, [$this, 'rollback'], 0); $this->eventProvider->on(Commit::class, [$this, 'commit'], 0); $this->_schema->db = $this; } /** * @param null $sql * @return PDO * @throws Exception */ public function getConnect($sql = NULL): PDO { return $this->getPdo($sql); } /** * @throws Exception */ public function fill() { $connections = $this->connections(); $pool = Config::get('databases.pool.max', 10); $connections->initConnections('Mysql:' . $this->cds, true, $pool); if (!empty($this->slaveConfig) && $this->cds != $this->slaveConfig['cds']) { $connections->initConnections('Mysql:' . $this->slaveConfig['cds'], false, $pool); } } /** * @param $sql * @return PDO * @throws Exception */ private function getPdo($sql): PDO { if ($this->isWrite($sql)) { return $this->masterInstance(); } else { return $this->slaveInstance(); } } /** * @return mixed * @throws ReflectionException * @throws NotFindClassException */ public function getSchema(): Schema { if ($this->_schema === null) { $this->_schema = Snowflake::createObject([ 'class' => Schema::class, 'db' => $this ]); } return $this->_schema; } /** * @param $sql * @return bool */ #[Pure] public function isWrite($sql): bool { if (empty($sql)) return false; if (str_starts_with(strtolower($sql), 'select')) { return false; } return true; } /** * @return mixed * @throws Exception */ public function getCacheDriver(): mixed { if (!$this->enableCache) { return null; } return Snowflake::app()->get($this->cacheDriver); } /** * @return PDO * @throws Exception */ public function masterInstance(): PDO { return $this->connections()->get([ 'cds' => $this->cds, 'username' => $this->username, 'password' => $this->password, 'database' => $this->database ], true); } /** * @return PDO * @throws Exception */ public function slaveInstance(): PDO { if (empty($this->slaveConfig) || Db::transactionsActive()) { return $this->masterInstance(); } if ($this->slaveConfig['cds'] == $this->cds) { return $this->masterInstance(); } return $this->connections()->get($this->slaveConfig, false); } /** * @return \Snowflake\Pool\Connection * @throws Exception */ private function connections(): \Snowflake\Pool\Connection { return Snowflake::app()->getMysqlFromPool(); } /** * @return $this * @throws Exception */ public function beginTransaction(): static { $this->connections()->beginTransaction($this->cds); return $this; } /** * @return $this|bool * @throws Exception */ public function inTransaction(): bool|static { return $this->connections()->inTransaction($this->cds); } /** * @throws Exception * 事务回滚 */ public function rollback() { $this->connections()->rollback($this->cds); $this->release(); } /** * @throws Exception * 事务提交 */ public function commit() { $this->connections()->commit($this->cds); $this->release(); } /** * @param $sql * @return PDO * @throws Exception */ public function refresh($sql): PDO { if ($this->isWrite($sql)) { $instance = $this->masterInstance(); } else { $instance = $this->slaveInstance(); } return $instance; } /** * @param null $sql * @param array $attributes * @return Command * @throws Exception */ public function createCommand($sql = null, array $attributes = []): Command { $command = new Command(['db' => $this, 'sql' => $sql]); return $command->bindValues($attributes); } /** * * 回收链接 * @throws */ public function release() { if (!Snowflake::isWorker() && !Snowflake::isProcess()) { $this->clear_connection(); return; } $connections = $this->connections(); $connections->release($this->cds, true); $connections->release($this->slaveConfig['cds'], false); } /** * @throws Exception */ public function recovery() { $connections = $this->connections(); $connections->release($this->cds, true); $connections->release($this->slaveConfig['cds'], false); } /** * * 回收链接 * @throws */ public function clear_connection() { $connections = $this->connections(); $connections->disconnect($this->cds, true); $connections->disconnect($this->slaveConfig['cds'], false); } /** * @throws Exception */ public function disconnect() { $connections = $this->connections(); $connections->disconnect($this->cds, true); $connections->disconnect($this->slaveConfig['cds'], false); } }