eee
This commit is contained in:
+63
-21
@@ -26,11 +26,13 @@ use Kiri\Events\EventProvider;
|
||||
use Kiri\Exception\NotFindClassException;
|
||||
use PDO;
|
||||
use Kiri\Error\StdoutLogger;
|
||||
use PDOStatement;
|
||||
use Psr\Log\LoggerInterface;
|
||||
use ReflectionException;
|
||||
use Kiri\Server\Events\OnWorkerStart;
|
||||
use Kiri\Server\Events\OnTaskerStart;
|
||||
use Kiri\Server\Events\OnAfterRequest;
|
||||
use Kiri\Di\Inject\Container;
|
||||
use Swoole\Timer;
|
||||
|
||||
/**
|
||||
* Class Connection
|
||||
@@ -39,11 +41,11 @@ use Kiri\Di\Inject\Container;
|
||||
class Connection extends Component
|
||||
{
|
||||
|
||||
public string $id = 'db';
|
||||
public string $cds = '';
|
||||
public string $id = 'db';
|
||||
public string $cds = '';
|
||||
public string $password = '';
|
||||
public string $username = '';
|
||||
public string $charset = 'utf-8';
|
||||
public string $charset = 'utf-8';
|
||||
|
||||
public string $tablePrefix = '';
|
||||
|
||||
@@ -115,6 +117,48 @@ class Connection extends Component
|
||||
$eventProvider->on(Commit::class, [$this, 'commit'], 0);
|
||||
$eventProvider->on(OnAfterRequest::class, [$this, 'clear']);
|
||||
$eventProvider->on(OnWorkerExit::class, [$this, 'disconnect']);
|
||||
$eventProvider->on(OnWorkerStart::class, [$this, 'tick']);
|
||||
$eventProvider->on(OnTaskerStart::class, [$this, 'tick']);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return void
|
||||
*/
|
||||
public function tick(): void
|
||||
{
|
||||
Timer::tick(10000, fn() => $this->checkClientHealth($this->pool()));
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param Pool $pool
|
||||
* @return void
|
||||
* @throws Exception
|
||||
*/
|
||||
protected function checkClientHealth(Pool $pool): void
|
||||
{
|
||||
$length = $pool->size($this->cds);
|
||||
for ($i = 0; $i < $length; $i++) {
|
||||
try {
|
||||
$bool = $pool->get($this->cds);
|
||||
if ($bool === false) {
|
||||
break;
|
||||
}
|
||||
/** @var PDO $client */
|
||||
[$client, $time] = $bool;
|
||||
if ((time() - $time) > $this->idle_time) {
|
||||
throw new Exception('Client timeout.');
|
||||
}
|
||||
if ($client->query('select 1') === false) {
|
||||
throw new Exception($client->errorInfo()[1]);
|
||||
}
|
||||
$pool->push($this->cds, [$client, time()]);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->logger->error(throwable($exception));
|
||||
$pool->abandon($this->cds);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -129,7 +173,7 @@ class Connection extends Component
|
||||
if ($this->_schema === null) {
|
||||
$this->_schema = Kiri::createObject([
|
||||
'class' => Schema::class,
|
||||
'db' => $this
|
||||
'db' => $this
|
||||
]);
|
||||
}
|
||||
return $this->_schema;
|
||||
@@ -162,14 +206,13 @@ class Connection extends Component
|
||||
}
|
||||
|
||||
[$client, $time] = $data;
|
||||
if ((time() - $time) < $this->idle_time && $this->canUse($client)) {
|
||||
if ((time() - $time) < $this->idle_time) {
|
||||
return $client;
|
||||
}
|
||||
|
||||
$this->logger->error('PDO连接已失效, 空闲超时或已不可用,重新获取.', [$this->cds]);
|
||||
$this->pool()->abandon($this->cds);
|
||||
|
||||
Waite::sleep(10);
|
||||
return $this->getNormalClientHealth();
|
||||
}
|
||||
|
||||
@@ -256,7 +299,7 @@ class Connection extends Component
|
||||
if ($pdo === null) {
|
||||
throw new Exception('Failed to rollback transaction: connection was exists.');
|
||||
}
|
||||
if ($this->inTransaction()) {
|
||||
if ($pdo->inTransaction()) {
|
||||
$pdo->rollback();
|
||||
}
|
||||
$this->pool()->push($this->cds, [$pdo, time()]);
|
||||
@@ -350,28 +393,27 @@ class Connection extends Component
|
||||
|
||||
|
||||
/**
|
||||
* @return array
|
||||
* @return array<PDO, int>
|
||||
*/
|
||||
public function newConnect(): array
|
||||
{
|
||||
$options = array_merge($this->attributes, [
|
||||
PDO::ATTR_CASE => PDO::CASE_NATURAL,
|
||||
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
|
||||
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
|
||||
PDO::ATTR_STRINGIFY_FETCHES => false,
|
||||
PDO::ATTR_EMULATE_PREPARES => true,
|
||||
PDO::ATTR_TIMEOUT => $this->connect_timeout,
|
||||
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset
|
||||
]);
|
||||
$link = new PDO('mysql:dbname=' . $this->database . ';host=' . $this->cds, $this->username, $this->password, $options);
|
||||
return [$link, time()];
|
||||
return [new PDO('mysql:dbname=' . $this->database . ';host=' . $this->cds,
|
||||
$this->username, $this->password, array_merge($this->attributes, [
|
||||
PDO::ATTR_CASE => PDO::CASE_NATURAL,
|
||||
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
|
||||
PDO::ATTR_ORACLE_NULLS => PDO::NULL_NATURAL,
|
||||
PDO::ATTR_STRINGIFY_FETCHES => false,
|
||||
PDO::ATTR_EMULATE_PREPARES => true,
|
||||
PDO::ATTR_TIMEOUT => $this->connect_timeout,
|
||||
PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES ' . $this->charset
|
||||
])), time()];
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return Pool
|
||||
*/
|
||||
private function pool(): Pool
|
||||
protected function pool(): Pool
|
||||
{
|
||||
if (!$this->connections->hasChannel($this->cds)) {
|
||||
$this->connections->created($this->cds, $this->pool['max'] ?? 1, [$this, 'newConnect']);
|
||||
|
||||
Reference in New Issue
Block a user