Files
kiri-core/System/Pool/Connection.php
T

436 lines
9.8 KiB
PHP
Raw Normal View History

2020-08-31 01:27:08 +08:00
<?php
2020-10-29 18:17:25 +08:00
declare(strict_types=1);
2020-08-31 01:27:08 +08:00
namespace Snowflake\Pool;
use HttpServer\Http\Context;
use PDO;
use Exception;
2020-09-18 16:42:28 +08:00
use Snowflake\Abstracts\Config;
2020-08-31 01:27:08 +08:00
use Swoole\Coroutine;
use Snowflake\Abstracts\Pool;
2020-09-06 05:37:00 +08:00
use Swoole\Timer;
2020-08-31 01:27:08 +08:00
/**
* Class Connection
2020-08-31 12:38:32 +08:00
* @package Snowflake\Pool
2020-08-31 01:27:08 +08:00
*/
class Connection extends Pool
{
2020-10-29 18:17:25 +08:00
public array $hasCreate = [];
2020-08-31 01:27:08 +08:00
2020-10-29 18:17:25 +08:00
public int $timeout = 1900;
2020-08-31 01:27:08 +08:00
/** @var PDO[] */
2020-10-29 18:17:25 +08:00
protected array $connections = [];
2020-08-31 01:27:08 +08:00
2020-09-04 19:10:08 +08:00
2020-10-29 18:17:25 +08:00
private int $creates = 0;
2020-09-04 19:10:08 +08:00
2020-09-06 05:37:00 +08:00
2020-10-29 18:17:25 +08:00
public int $lastTime = 0;
2020-09-06 05:37:00 +08:00
/**
* @param $timer
*/
public function Heartbeat_detection($timer)
{
2021-01-04 16:53:29 +08:00
$this->creates = $timer;
if ($this->lastTime == 0) {
return;
}
if ($this->lastTime + 600 < time()) {
$this->flush(0);
} else if ($this->lastTime + 300 < time()) {
$this->flush(2);
2020-09-06 05:37:00 +08:00
}
2020-09-06 05:39:35 +08:00
}
2020-09-06 05:37:00 +08:00
2020-09-06 05:39:35 +08:00
/**
* @param $retain_number
*/
protected function flush($retain_number)
{
$channels = $this->getChannels();
foreach ($channels as $name => $channel) {
2020-09-06 16:36:04 +08:00
$this->pop($channel, $name, $retain_number);
}
if ($retain_number == 0) {
2020-09-06 17:34:16 +08:00
$this->debug('release Timer::tick');
2020-09-06 16:36:04 +08:00
Timer::clear($this->creates);
2020-09-06 17:21:31 +08:00
$this->creates = 0;
2020-09-06 16:36:04 +08:00
}
}
/**
* @param $channel
* @param $name
* @param $retain_number
*/
protected function pop($channel, $name, $retain_number)
{
while ($channel->length() > $retain_number) {
[$timer, $connection] = $channel->pop();
if ($connection) {
unset($connection);
2020-09-06 05:39:35 +08:00
}
2020-09-06 16:36:04 +08:00
$this->desc($name);
2020-09-06 05:39:35 +08:00
}
2020-09-06 05:37:00 +08:00
}
2020-08-31 01:27:08 +08:00
/**
* @param $timeout
*/
public function setTimeout($timeout)
{
$this->timeout = $timeout;
}
/**
* @param $value
*/
public function setLength($value)
{
$this->max = $value;
}
/**
* @param $cds
* @return bool
*
* db is in transaction
*/
2020-12-17 14:09:14 +08:00
public function inTransaction($cds): bool
2020-08-31 01:27:08 +08:00
{
2020-09-17 14:12:14 +08:00
$coroutineName = $this->name($cds, true);
2020-09-05 03:29:58 +08:00
if (!Context::hasContext('begin_' . $coroutineName)) {
2020-08-31 01:27:08 +08:00
return false;
}
2020-09-05 03:29:58 +08:00
return Context::getContext('begin_' . $coroutineName) == 0;
2020-08-31 01:27:08 +08:00
}
/**
* @param $coroutineName
*/
public function beginTransaction($coroutineName)
{
2020-09-17 14:12:14 +08:00
$coroutineName = $this->name($coroutineName, true);
2020-09-05 03:29:58 +08:00
if (!Context::hasContext('begin_' . $coroutineName)) {
Context::setContext('begin_' . $coroutineName, 0);
2020-08-31 01:27:08 +08:00
}
2020-09-05 03:29:58 +08:00
Context::autoIncr('begin_' . $coroutineName);
2020-09-06 04:28:57 +08:00
if (!Context::getContext('begin_' . $coroutineName) !== 0) {
return;
}
$connection = Context::getContext($coroutineName);
if ($connection instanceof PDO && !$connection->inTransaction()) {
$connection->beginTransaction();
}
2020-08-31 01:27:08 +08:00
}
/**
* @param $coroutineName
*/
public function commit($coroutineName)
{
2020-09-17 14:12:14 +08:00
$coroutineName = $this->name($coroutineName, true);
2020-09-05 03:29:58 +08:00
if (!Context::hasContext('begin_' . $coroutineName)) {
2020-08-31 01:27:08 +08:00
return;
}
2020-09-05 03:29:58 +08:00
if (Context::autoDecr('begin_' . $coroutineName) > 0) {
2020-08-31 01:27:08 +08:00
return;
}
$connection = Context::getContext($coroutineName);
2020-09-06 04:28:57 +08:00
if (!($connection instanceof PDO)) {
return;
}
Context::setContext('begin_' . $coroutineName, 0);
if ($connection->inTransaction()) {
$this->info('connection commit.');
$connection->commit();
2020-08-31 01:27:08 +08:00
}
}
/**
* @param $name
* @param false $isMaster
* @return array
*/
2020-12-17 14:09:14 +08:00
private function getIndex($name, $isMaster = false): array
2020-08-31 01:27:08 +08:00
{
2020-09-17 14:04:22 +08:00
return [Coroutine::getCid(), $this->name($name, $isMaster)];
2020-08-31 01:27:08 +08:00
}
/**
* @param $coroutineName
*/
public function rollback($coroutineName)
{
2020-09-17 14:12:14 +08:00
$coroutineName = $this->name($coroutineName, true);
2020-09-05 03:29:58 +08:00
if (!Context::hasContext('begin_' . $coroutineName)) {
2020-08-31 01:27:08 +08:00
return;
}
2020-09-05 03:29:58 +08:00
if (Context::autoDecr('begin_' . $coroutineName) > 0) {
2020-08-31 01:27:08 +08:00
return;
}
if ($this->hasClient($coroutineName)) {
/** @var PDO $connection */
$connection = Context::getContext($coroutineName);
if ($connection->inTransaction()) {
$this->info('connection rollBack.');
$connection->rollBack();
}
}
2020-09-05 03:29:58 +08:00
Context::setContext('begin_' . $coroutineName, 0);
2020-08-31 01:27:08 +08:00
}
/**
* @param array $config
* @param bool $isMaster
* @return mixed
* @throws Exception
*/
2020-12-17 14:09:14 +08:00
public function getConnection(array $config, $isMaster = false): mixed
2020-08-31 01:27:08 +08:00
{
2020-09-17 14:12:14 +08:00
$coroutineName = $this->name($config['cds'], $isMaster);
2020-09-05 01:26:11 +08:00
if (!isset($this->hasCreate[$coroutineName])) {
$this->hasCreate[$coroutineName] = 0;
}
2020-08-31 01:27:08 +08:00
if (Context::hasContext($coroutineName)) {
return Context::getContext($coroutineName);
2020-09-05 01:23:09 +08:00
}
2021-01-04 18:54:27 +08:00
if ($this->size($coroutineName) < 1) {
2021-01-05 10:23:20 +08:00
$connections = $this->newClient($config, $coroutineName);
} else {
[$time, $connections] = $this->get($coroutineName);
if (!($connections instanceof PDO)) {
throw new Exception('Database exception.');
}
2020-09-05 02:01:31 +08:00
}
2021-01-04 18:40:11 +08:00
return Context::setContext($coroutineName, $connections);
2020-08-31 01:27:08 +08:00
}
/**
* @param $config
2021-01-04 18:40:11 +08:00
* @param $coroutineName
2021-01-05 10:23:20 +08:00
* @return PDO|null
* @throws Exception
2020-08-31 01:27:08 +08:00
*/
2021-01-05 10:23:20 +08:00
private function newClient($config, $coroutineName): PDO|null
2020-08-31 01:27:08 +08:00
{
2021-01-05 10:23:20 +08:00
return $this->createConnect($this->parseConfig($config, $coroutineName), $coroutineName, function ($cds, $username, $password, $charset, $coroutineName) {
2021-01-04 18:40:11 +08:00
$link = new PDO($cds, $username, $password, [
PDO::ATTR_EMULATE_PREPARES => false,
PDO::ATTR_CASE => PDO::CASE_NATURAL,
PDO::ATTR_TIMEOUT => $this->timeout,
]);
$link->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$link->setAttribute(PDO::ATTR_STRINGIFY_FETCHES, false);
$link->setAttribute(PDO::ATTR_ORACLE_NULLS, PDO::NULL_EMPTY_STRING);
if (!empty($charset)) {
$link->query('SET NAMES ' . $charset);
}
2021-01-04 18:45:08 +08:00
$this->success('create db client -> ' . $cds . ':' . env('workerId', 0));
2021-01-04 18:40:11 +08:00
$this->incr($coroutineName);
if ($number = Context::getContext('begin_' . $coroutineName, Coroutine::getCid())) {
$number > 0 && $link->beginTransaction();
}
if ($this->creates === 0) {
$this->creates = Timer::tick(10000, [$this, 'Heartbeat_detection']);
}
return $link;
});
2020-08-31 01:27:08 +08:00
}
2020-09-18 16:42:28 +08:00
/**
* @param $config
* @return array
*/
2021-01-04 18:40:11 +08:00
private function parseConfig($config, $name): array
2020-09-18 16:42:28 +08:00
{
2021-01-04 18:40:11 +08:00
return [$config['cds'], $config['username'], $config['password'], $config['charset'] ?? 'utf8mb4', $name];
2020-09-18 16:42:28 +08:00
}
2020-08-31 01:27:08 +08:00
/**
* @param $coroutineName
* @param $isMaster
*/
public function release($coroutineName, $isMaster)
{
2020-09-17 14:12:14 +08:00
$coroutineName = $this->name($coroutineName, $isMaster);
2020-08-31 01:27:08 +08:00
if (!$this->hasClient($coroutineName)) {
return;
}
/** @var PDO $client */
$client = Context::getContext($coroutineName);
if ($client->inTransaction()) {
$client->commit();
}
$this->push($coroutineName, $client);
$this->remove($coroutineName);
2020-09-06 05:37:00 +08:00
$this->lastTime = time();
2020-08-31 01:27:08 +08:00
}
/**
* @param $coroutineName
* @return bool
*/
2020-12-17 14:09:14 +08:00
private function hasClient($coroutineName): bool
2020-08-31 01:27:08 +08:00
{
return Context::hasContext($coroutineName);
}
/**
* batch release
*/
public function connection_clear()
{
$this->debug('receive all clients.');
$connections = Context::getAllContext();
foreach ($connections as $name => $connection) {
if (empty($connection) || !($connection instanceof PDO)) {
continue;
}
/** @var PDO $pdoClient */
if ($connection->inTransaction()) {
$connection->commit();
}
$this->push($name, $connection);
$this->remove($name);
}
2020-09-04 19:10:08 +08:00
$this->hasCreate = [];
2020-09-06 17:35:11 +08:00
$this->creates = 0;
2020-08-31 01:27:08 +08:00
}
/**
* @param $coroutineName
*/
public function remove($coroutineName)
{
Context::deleteId($coroutineName);
}
/**
2020-09-04 19:10:08 +08:00
* @param $name
2020-08-31 01:27:08 +08:00
* @param $time
2020-12-17 14:09:14 +08:00
* @param $client
2020-08-31 01:27:08 +08:00
* @return bool
*/
2020-12-17 14:09:14 +08:00
public function checkCanUse($name, $time, $client): bool
2020-08-31 01:27:08 +08:00
{
try {
2020-09-05 02:34:52 +08:00
if ($time + 60 * 10 > time()) {
2020-09-05 02:36:15 +08:00
return $result = true;
2020-09-05 02:34:52 +08:00
}
2020-12-17 14:09:14 +08:00
if (empty($client) || !($client instanceof PDO)) {
2020-09-04 19:10:08 +08:00
return $result = false;
2020-08-31 01:27:08 +08:00
}
2020-12-17 14:09:14 +08:00
if (!$client->getAttribute(PDO::ATTR_SERVER_INFO)) {
2020-09-04 19:10:08 +08:00
return $result = false;
2020-08-31 01:27:08 +08:00
}
2020-09-04 19:10:08 +08:00
return $result = true;
2020-08-31 01:27:08 +08:00
} catch (\Swoole\Error | \Throwable $exception) {
2020-09-04 19:10:08 +08:00
return $result = false;
} finally {
if (!$result) {
$this->desc($name);
}
2020-08-31 01:27:08 +08:00
}
}
/**
2020-09-05 01:35:52 +08:00
* @param $coroutineName
2020-08-31 01:27:08 +08:00
* @param $cds
* @param $username
* @param $password
2020-09-18 16:42:28 +08:00
* @param string $charset
2020-08-31 01:27:08 +08:00
* @throws Exception
*/
2021-01-04 18:40:11 +08:00
// public function createConnect($coroutineName, $cds, $username, $password, $charset = 'utf8mb4'): void
// {
// try {
// if (Context::hasContext('at_create_db')) {
// return;
// }
// Context::setContext('at_create_db', 1);
// $link = new PDO($cds, $username, $password, [
// PDO::ATTR_EMULATE_PREPARES => false,
// PDO::ATTR_CASE => PDO::CASE_NATURAL,
// PDO::ATTR_TIMEOUT => $this->timeout,
// ]);
// $link->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
// $link->setAttribute(PDO::ATTR_STRINGIFY_FETCHES, false);
// $link->setAttribute(PDO::ATTR_ORACLE_NULLS, PDO::NULL_EMPTY_STRING);
// if (!empty($charset)) {
// $link->query('SET NAMES ' . $charset);
// }
// $this->success('create db client -> ' . $cds . ':' . $this->hasCreate[$coroutineName] . ':' . $this->size($coroutineName));
// $this->incr($coroutineName);
// if ($number = Context::getContext('begin_' . $coroutineName, Coroutine::getCid())) {
// $number > 0 && $link->beginTransaction();
// }
// if ($this->creates === 0) {
// $this->creates = Timer::tick(10000, [$this, 'Heartbeat_detection']);
// }
// $this->push($coroutineName, $link);
// } catch (\Throwable $exception) {
// $this->addError($cds . ' -> ' . $exception->getMessage());
// if ($exception->getCode() === 2006) {
// $this->createConnect($coroutineName, $cds, $username, $password, $charset);
// }
// throw new Exception($exception->getMessage());
// } finally {
// Context::deleteId('at_create_db');
// }
// }
2020-08-31 01:27:08 +08:00
/**
* @param $coroutineName
*/
public function disconnect($coroutineName)
{
if (!$this->hasClient($coroutineName)) {
return;
}
$this->remove($coroutineName);
$this->clean($coroutineName);
}
/**
* @param $coroutineName
*/
public function incr($coroutineName)
{
if (!isset($this->hasCreate[$coroutineName])) {
$this->hasCreate[$coroutineName] = 0;
}
$this->hasCreate[$coroutineName] += 1;
}
/**
2021-01-04 16:17:59 +08:00
* @param string $name
2020-08-31 01:27:08 +08:00
*/
2021-01-04 16:17:59 +08:00
public function desc(string $name)
2020-08-31 01:27:08 +08:00
{
2021-01-04 16:17:59 +08:00
if (!isset($this->hasCreate[$name])) {
2021-01-04 16:20:08 +08:00
$this->hasCreate[$name] = 0;
2020-08-31 01:27:08 +08:00
}
2021-01-04 16:17:59 +08:00
$this->hasCreate[$name] -= 1;
2020-08-31 01:27:08 +08:00
}
}