387 lines
9.3 KiB
PHP
387 lines
9.3 KiB
PHP
<?php
|
|
/**
|
|
* Created by PhpStorm.
|
|
* User: whwyy
|
|
* Date: 2018/3/30 0030
|
|
* Time: 14:09
|
|
*/
|
|
declare(strict_types=1);
|
|
|
|
|
|
namespace Database;
|
|
|
|
|
|
use Database\Affair\BeginTransaction;
|
|
use Database\Affair\Commit;
|
|
use Database\Affair\Rollback;
|
|
use Exception;
|
|
use Kiri;
|
|
use Kiri\Server\Events\OnWorkerExit;
|
|
use Kiri\Abstracts\Component;
|
|
use Kiri\Di\Context;
|
|
use Kiri\Pool\Pool;
|
|
use Kiri\Events\EventProvider;
|
|
use Kiri\Error\StdoutLogger;
|
|
use Psr\Log\LoggerInterface;
|
|
use Kiri\Server\Events\OnWorkerStart;
|
|
use Kiri\Server\Events\OnTaskerStart;
|
|
use Kiri\Server\Events\OnAfterRequest;
|
|
use Kiri\Di\Inject\Container;
|
|
use Swoole\Timer;
|
|
use Database\Base\PDO;
|
|
|
|
//use PDO;
|
|
|
|
/**
|
|
* Class Connection
|
|
* @package Database
|
|
*/
|
|
class Connection extends Component
|
|
{
|
|
|
|
public string $id = 'db';
|
|
public string $cds = '';
|
|
public string $password = '';
|
|
public string $username = '';
|
|
public string $charset = 'utf-8';
|
|
public string $tablePrefix = '';
|
|
public string $database = '';
|
|
public int $timeout = 30;
|
|
public int $waite_time = 3;
|
|
public int $tick_time = 60;
|
|
public int $idle_count = 3;
|
|
public int $idle_time = 60;
|
|
public array $pool = ['max' => 10, 'min' => 1];
|
|
private int $storey = 0;
|
|
protected int $timerId = -1;
|
|
public bool $enableCache = false;
|
|
public string $cacheDriver = 'redis';
|
|
public array $attributes = [];
|
|
public array $slave = [];
|
|
protected ?\Closure $_println = null;
|
|
|
|
|
|
/**
|
|
* @var StdoutLogger
|
|
*/
|
|
#[Container(LoggerInterface::class)]
|
|
public StdoutLogger $logger;
|
|
|
|
|
|
/**
|
|
* @var EventProvider
|
|
*/
|
|
#[Container(EventProvider::class)]
|
|
public EventProvider $eventProvider;
|
|
|
|
|
|
/**
|
|
* @param Pool $connections
|
|
*/
|
|
public function __construct(public Pool $connections)
|
|
{
|
|
parent::__construct();
|
|
|
|
$this->_println = \config('databases.logger', null);
|
|
}
|
|
|
|
|
|
/**
|
|
* @param float $startTime
|
|
* @param float $endTime
|
|
* @param string $sql
|
|
* @param array $params
|
|
* @return void
|
|
*/
|
|
public function println(float $startTime, float $endTime, string $sql, array $params = []): void
|
|
{
|
|
if (is_callable($this->_println)) {
|
|
call_user_func($this->_println, $startTime, $endTime, $sql, $params);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @return void
|
|
* @throws
|
|
*/
|
|
public function init(): void
|
|
{
|
|
$this->eventProvider->on(BeginTransaction::class, [$this, 'beginTransaction'], 0);
|
|
$this->eventProvider->on(Rollback::class, [$this, 'rollback'], 0);
|
|
$this->eventProvider->on(Commit::class, [$this, 'commit'], 0);
|
|
$this->eventProvider->on(OnAfterRequest::class, [$this, 'clear']);
|
|
$this->eventProvider->on(OnWorkerExit::class, [$this, 'disconnect']);
|
|
$this->eventProvider->on(OnWorkerStart::class, [$this, 'tick']);
|
|
$this->eventProvider->on(OnTaskerStart::class, [$this, 'tick']);
|
|
}
|
|
|
|
|
|
/**
|
|
* @return void
|
|
*/
|
|
public function tick(): void
|
|
{
|
|
$this->timerId = Timer::tick($this->tick_time, fn () => $this->checkClientHealth($this->pool()));
|
|
}
|
|
|
|
|
|
/**
|
|
* @param Pool $pool
|
|
* @return void
|
|
* @throws
|
|
*/
|
|
protected function checkClientHealth(Pool $pool): void
|
|
{
|
|
$pool->flush($this->getName(), $this->pool['min'] ?? 1);
|
|
$length = $pool->size($this->getName());
|
|
for ($i = 0; $i < $length; $i++) {
|
|
try {
|
|
if (($client = $this->validator($pool)) === false) {
|
|
break;
|
|
}
|
|
$pool->push($this->getName(), $client);
|
|
} catch (\Throwable $exception) {
|
|
if (!str_contains($exception->getMessage(), 'Client timeout.')) {
|
|
$this->logger->error(throwable($exception), [$this->cds]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @return string
|
|
*/
|
|
private function getName(): string
|
|
{
|
|
return 'mysql.' . $this->cds;
|
|
}
|
|
|
|
|
|
/**
|
|
* @param Pool $pool
|
|
* @return PDO|bool
|
|
* @throws
|
|
*/
|
|
protected function validator(Pool $pool): PDO|bool
|
|
{
|
|
/** @var $client PDO */
|
|
if (($client = $pool->get($this->getName())) === false) {
|
|
return false;
|
|
}
|
|
if ($client->query('select 1') === false) {
|
|
throw new Exception($client->errorInfo()[1]);
|
|
}
|
|
return $client;
|
|
}
|
|
|
|
|
|
/**
|
|
* @return PDO
|
|
* @throws
|
|
*/
|
|
public function getConnection(): PDO
|
|
{
|
|
if (!$this->inTransaction()) {
|
|
return $this->getNormalClientHealth();
|
|
} else {
|
|
return $this->getTransactionClient();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @return PDO
|
|
* @throws
|
|
*/
|
|
protected function getNormalClientHealth(): PDO
|
|
{
|
|
$data = $this->pool()->get($this->getName(), $this->waite_time);
|
|
if ($data === false) {
|
|
throw new Exception('Client Waite timeout.');
|
|
}
|
|
return $data;
|
|
}
|
|
|
|
|
|
/**
|
|
* @return $this
|
|
* @throws
|
|
*/
|
|
public function beginTransaction(): static
|
|
{
|
|
if ($this->storey < 0) {
|
|
$this->storey = 0;
|
|
}
|
|
$this->storey++;
|
|
return $this;
|
|
}
|
|
|
|
|
|
/**
|
|
* @return PDO
|
|
* @throws
|
|
*/
|
|
public function getTransactionClient(): PDO
|
|
{
|
|
$pdo = Context::get($this->cds);
|
|
if ($pdo === null) {
|
|
/** @var PDO $pdo */
|
|
$pdo = Context::set($this->cds, $this->getNormalClientHealth());
|
|
}
|
|
if ($this->storey > 0 && !$pdo->inTransaction()) {
|
|
$pdo->beginTransaction();
|
|
}
|
|
return $pdo;
|
|
}
|
|
|
|
|
|
/**
|
|
* @return bool
|
|
* @throws
|
|
*/
|
|
public function inTransaction(): bool
|
|
{
|
|
return $this->storey > 0;
|
|
}
|
|
|
|
|
|
/**
|
|
* @throws
|
|
* 事务回滚
|
|
*/
|
|
public function rollback(): void
|
|
{
|
|
$this->storey--;
|
|
if ($this->storey == 0) {
|
|
if (!Context::exists($this->cds)) {
|
|
return;
|
|
}
|
|
$pdo = $this->getTransactionClient();
|
|
if ($pdo->inTransaction()) {
|
|
$pdo->rollback();
|
|
}
|
|
$this->release($pdo);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* @throws
|
|
* 事务提交
|
|
*/
|
|
public function commit(): void
|
|
{
|
|
$this->storey--;
|
|
if ($this->storey == 0) {
|
|
if (!Context::exists($this->cds)) {
|
|
return;
|
|
}
|
|
$pdo = $this->getTransactionClient();
|
|
if ($pdo->inTransaction()) {
|
|
$pdo->commit();
|
|
}
|
|
$this->release($pdo);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
* @return void
|
|
* @throws
|
|
*/
|
|
public function clear(): void
|
|
{
|
|
/** @var PDO $pdo */
|
|
$pdo = Context::get($this->cds);
|
|
if ($pdo === null) {
|
|
return;
|
|
}
|
|
if ($this->inTransaction()) {
|
|
$pdo->rollback();
|
|
}
|
|
$this->release($pdo);
|
|
}
|
|
|
|
|
|
/**
|
|
* @param string $sql
|
|
* @param array $attributes
|
|
* @return Command
|
|
* @throws
|
|
*/
|
|
public function createCommand(string $sql, array $attributes = []): Command
|
|
{
|
|
return (new Command(['connection' => $this, 'sql' => $sql]))->bindValues($attributes);
|
|
}
|
|
|
|
|
|
/**
|
|
* 回收链接
|
|
* @throws
|
|
*/
|
|
public function release(PDO $pdo): void
|
|
{
|
|
if (!$this->inTransaction()) {
|
|
$this->pool()->push($this->getName(), $pdo);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
*
|
|
* 回收链接
|
|
* @throws
|
|
*/
|
|
public function clear_connection(): void
|
|
{
|
|
$this->pool()->flush($this->getName(), 0);
|
|
}
|
|
|
|
|
|
/**
|
|
* @throws
|
|
*/
|
|
public function disconnect(): void
|
|
{
|
|
if ($this->timerId > -1) {
|
|
Timer::clear($this->timerId);
|
|
}
|
|
$this->pool()->close($this->getName());
|
|
}
|
|
|
|
|
|
/**
|
|
* @return PDO
|
|
*/
|
|
public function newConnect(): \PDO
|
|
{
|
|
$pdo = new PDO($this->database, $this->cds, $this->username, $this->password, [
|
|
\PDO::ATTR_CASE => \PDO::CASE_NATURAL,
|
|
\PDO::ATTR_ERRMODE => \PDO::ERRMODE_EXCEPTION,
|
|
\PDO::ATTR_ORACLE_NULLS => \PDO::NULL_NATURAL,
|
|
\PDO::ATTR_STRINGIFY_FETCHES => false,
|
|
\PDO::ATTR_EMULATE_PREPARES => true,
|
|
\PDO::ATTR_TIMEOUT => $this->timeout,
|
|
\PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset
|
|
]);
|
|
foreach ($this->attributes as $key => $attribute) {
|
|
$pdo->setAttribute($key, $attribute);
|
|
}
|
|
return $pdo;
|
|
}
|
|
|
|
|
|
/**
|
|
* @return Pool
|
|
*/
|
|
protected function pool(): Pool
|
|
{
|
|
if (!$this->connections->hasChannel($this->getName())) {
|
|
$this->connections->created($this->getName(), $this->pool['max'] ?? 1, [$this, 'newConnect']);
|
|
}
|
|
return $this->connections;
|
|
}
|
|
|
|
}
|